You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2024/02/09 17:59:45 UTC

(airflow) branch v2-8-test updated (655f3326ad -> be858a50cf)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a change to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


    from 655f3326ad Fix failing static/unit tests.
     new a970d1af03 fix copy-bug (#34904)
     new dbdec0bf7a Sanitize the conn_id to disallow potential script execution (#32867)
     new 1651ee74bd REST API set description on POST to `/variables` endpoint (#36820)
     new d3b027d914 Fix webserver always redirecting to home page if user was not logged in (#36833)
     new 83d630c7fb Fix bug introduced by replacing spaces by + in run_id (#36877)
     new afd47ae993 typo fix (#36900)
     new 61367fc69f Link to release notes in the upgrade docs (#36923)
     new 8e45528617 fix(dagrun): copy bug (#36855)
     new 07ff8b059b Optimize max_execution_date query in single dag case (#33242)
     new 36cdca5571 fix: disable dryrun autofetch (#36941)
     new 29930bea09 Updated config.yml for environment variable sql_alchemy_connect_args  (#36526)
     new 2096019798 fix: DagRuns with UPSTREAM_FAILED tasks get stuck in the backfill. (#36954)
     new 198dd64f2a add description to queue_when (#36997)
     new 23d9092c29 Secret masker ignores passwords with special chars (#36692)
     new c1a2c3b326 Bugfix Triggering DAG with parameters is mandatory when show_trigger_form_if_no_params is enabled (#37063)
     new a2c9a0a8db Move docs for cron basics to Authoring and Scheduling section (#37049)
     new 0fc68e0ffd Displaying "actual" try number in TaskInstance view (#34635)
     new c60bd0b5df Add clarification about DAG author capabilities to security model (#37141)
     new e8050379c9 Allow running airflow against sqlite in-memory DB for tests (#37144)
     new 5919444b10 Revert the sequence of initializing configuration defaults (#37155)
     new 50f3642bb7 Add airflow version substitution into Docker Compose Howto (#37177)
     new 2817c24433 Adjust graph node layout (#37207)
     new c0c5ab4b6f Fix collapsed grid width, line up selected bar with gantt (#37205)
     new f841e70fc5 fix: PythonVirtualenvOperator crashes if any python_callable function is defined in the same source as DAG (#37165)
     new 6a18db922c Adjust node width based on task name length (#37254)
     new be858a50cf Fix the bug that affected the DAG end date. (#36144)

The 26 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/api/common/mark_tasks.py                   |  5 --
 .../api_connexion/endpoints/variable_endpoint.py   |  3 +-
 airflow/api_connexion/openapi/v1.yaml              |  2 +
 airflow/config_templates/config.yml                | 10 ++-
 airflow/configuration.py                           |  3 +-
 airflow/jobs/backfill_job_runner.py                |  8 +-
 airflow/models/connection.py                       | 39 +++++++++-
 airflow/models/dag.py                              | 69 ++++++++++++-----
 airflow/models/dagbag.py                           | 12 +--
 airflow/models/dagrun.py                           | 66 +++++++++++++++-
 airflow/models/taskinstance.py                     | 12 +++
 airflow/operators/python.py                        | 23 ++++--
 airflow/settings.py                                | 21 +++--
 airflow/utils/file.py                              | 12 +++
 airflow/utils/python_virtualenv_script.jinja2      | 19 ++++-
 airflow/www/auth.py                                |  4 +-
 airflow/www/forms.py                               |  4 +-
 airflow/www/static/js/api/useClearTaskDryRun.ts    |  5 +-
 airflow/www/static/js/api/useMarkTaskDryRun.ts     |  5 +-
 airflow/www/static/js/components/Clipboard.tsx     |  8 +-
 airflow/www/static/js/components/Graph/Edge.tsx    |  6 +-
 airflow/www/static/js/dag/Main.tsx                 |  2 +-
 .../www/static/js/dag/{grid => }/TaskName.test.tsx | 13 +---
 airflow/www/static/js/dag/TaskName.tsx             | 89 ++++++++++++++++++++++
 airflow/www/static/js/dag/details/Header.tsx       |  2 +-
 airflow/www/static/js/dag/details/dagRun/index.tsx |  9 ++-
 .../www/static/js/dag/details/graph/Node.test.tsx  |  2 +-
 airflow/www/static/js/dag/details/graph/Node.tsx   | 86 +++++++++------------
 airflow/www/static/js/dag/details/graph/index.tsx  | 14 ++++
 airflow/www/static/js/dag/details/graph/utils.ts   |  8 ++
 airflow/www/static/js/dag/details/index.tsx        |  2 +-
 .../taskInstance/taskActions/ClearInstance.tsx     |  1 +
 .../taskInstance/taskActions/MarkInstanceAs.tsx    |  1 +
 airflow/www/static/js/dag/grid/TaskName.tsx        | 70 -----------------
 airflow/www/static/js/dag/grid/dagRuns/index.tsx   | 12 ++-
 airflow/www/static/js/dag/grid/index.test.tsx      | 14 +---
 airflow/www/static/js/dag/grid/index.tsx           |  3 +-
 airflow/www/static/js/dag/grid/renderTaskRows.tsx  | 14 +++-
 airflow/www/static/js/types/index.ts               |  1 +
 airflow/www/static/js/utils/graph.ts               |  7 +-
 airflow/www/templates/airflow/dag.html             |  1 +
 airflow/www/templates/airflow/dags.html            |  1 +
 airflow/www/validators.py                          | 28 ++++++-
 airflow/www/views.py                               | 11 ++-
 .../authoring-and-scheduling/cron.rst              | 70 +++++++++++++++++
 .../authoring-and-scheduling/index.rst             |  1 +
 docs/apache-airflow/core-concepts/dag-run.rst      | 36 ---------
 docs/apache-airflow/core-concepts/dags.rst         | 19 +++--
 .../apache-airflow/core-concepts/objectstorage.rst |  2 +-
 docs/apache-airflow/howto/docker-compose/index.rst |  4 +-
 docs/apache-airflow/installation/upgrading.rst     |  5 ++
 docs/apache-airflow/security/security_model.rst    |  6 ++
 docs/conf.py                                       |  1 +
 tests/always/test_connection.py                    | 13 ++--
 .../endpoints/test_dag_run_endpoint.py             |  4 +-
 .../endpoints/test_variable_endpoint.py            | 20 +++--
 tests/api_experimental/client/test_local_client.py |  4 +-
 tests/api_experimental/common/test_mark_tasks.py   | 49 ++++++++----
 tests/core/test_settings.py                        | 24 ++++--
 ... => test_backfill_with_upstream_failed_task.py} |  9 ++-
 tests/jobs/test_backfill_job.py                    | 30 +++++++-
 tests/models/test_cleartasks.py                    |  5 +-
 tests/models/test_connection.py                    | 64 ++++++++++++++++
 tests/models/test_dag.py                           | 86 +++++++++++++++++++++
 tests/utils/log/test_secrets_masker.py             | 18 ++++-
 tests/www/views/test_anonymous_as_admin_role.py    |  4 +-
 tests/www/views/test_views_trigger_dag.py          |  2 +
 67 files changed, 883 insertions(+), 320 deletions(-)
 rename airflow/www/static/js/dag/{grid => }/TaskName.test.tsx (83%)
 create mode 100644 airflow/www/static/js/dag/TaskName.tsx
 delete mode 100644 airflow/www/static/js/dag/grid/TaskName.tsx
 create mode 100644 docs/apache-airflow/authoring-and-scheduling/cron.rst
 copy tests/dags/{test_failing.py => test_backfill_with_upstream_failed_task.py} (74%)


(airflow) 08/26: fix(dagrun): copy bug (#36855)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8e4552861740a472e5953a774ac209f9c0de5116
Author: Muhammad Luqman <48...@users.noreply.github.com>
AuthorDate: Sat Jan 20 14:07:26 2024 +0700

    fix(dagrun): copy bug (#36855)
    
    (cherry picked from commit 6e7085be6173ebec5e4c52c1223309460c632748)
---
 airflow/www/static/js/dag/details/dagRun/index.tsx | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/airflow/www/static/js/dag/details/dagRun/index.tsx b/airflow/www/static/js/dag/details/dagRun/index.tsx
index ed9dce266f..2bbdc5cdb0 100644
--- a/airflow/www/static/js/dag/details/dagRun/index.tsx
+++ b/airflow/www/static/js/dag/details/dagRun/index.tsx
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-import React, { useRef } from "react";
+import React, { useEffect, useRef } from "react";
 import {
   Flex,
   Box,
@@ -65,7 +65,12 @@ const DagRun = ({ runId }: Props) => {
   const offsetTop = useOffsetTop(detailsRef);
 
   const run = dagRuns.find((dr) => dr.runId === runId);
-  const { onCopy, hasCopied } = useClipboard(formatConf(run?.conf));
+  const { onCopy, setValue, hasCopied } = useClipboard(formatConf(run?.conf));
+
+  useEffect(() => {
+    setValue(formatConf(run?.conf));
+  }, [run, setValue]);
+
   if (!run) return null;
   const {
     state,


(airflow) 19/26: Allow running airflow against sqlite in-memory DB for tests (#37144)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit e8050379c91661d9601fc1562063a391f489344b
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Fri Feb 2 13:51:47 2024 +0000

    Allow running airflow against sqlite in-memory DB for tests (#37144)
    
    This makes no sense for production workloads, but it is useful for
    writing tests of Airflow operators etc.
    
    I also moved the warning from configure_vars to convigure_orm so that it
    is checked after local settings have been applied in case that changes
    anything.
    
    (cherry picked from commit 7324400c6b379d22030a6bd54fb14912c1cb291f)
---
 airflow/settings.py         | 21 ++++++++++++++-------
 tests/core/test_settings.py | 24 +++++++++++++++++-------
 2 files changed, 31 insertions(+), 14 deletions(-)

diff --git a/airflow/settings.py b/airflow/settings.py
index 88cd8e5276..9b2815efea 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -192,13 +192,6 @@ def configure_vars():
     global PLUGINS_FOLDER
     global DONOT_MODIFY_HANDLERS
     SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN")
-    if SQL_ALCHEMY_CONN.startswith("sqlite") and not SQL_ALCHEMY_CONN.startswith("sqlite:////"):
-        from airflow.exceptions import AirflowConfigException
-
-        raise AirflowConfigException(
-            f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to sqlite. "
-            "Please use absolute path such as `sqlite:////tmp/airflow.db`."
-        )
 
     DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))
 
@@ -232,6 +225,20 @@ def configure_orm(disable_connection_pool=False, pool_class=None):
     """Configure ORM using SQLAlchemy."""
     from airflow.utils.log.secrets_masker import mask_secret
 
+    if (
+        SQL_ALCHEMY_CONN
+        and SQL_ALCHEMY_CONN.startswith("sqlite")
+        and not SQL_ALCHEMY_CONN.startswith("sqlite:////")
+        # In memory is not useful for production, but useful for writing tests against Airflow for extensions
+        and SQL_ALCHEMY_CONN != "sqlite://"
+    ):
+        from airflow.exceptions import AirflowConfigException
+
+        raise AirflowConfigException(
+            f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to sqlite. "
+            "Please use absolute path such as `sqlite:////tmp/airflow.db`."
+        )
+
     global Session
     global engine
     if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
diff --git a/tests/core/test_settings.py b/tests/core/test_settings.py
index 79f49b4ef2..55d46cac15 100644
--- a/tests/core/test_settings.py
+++ b/tests/core/test_settings.py
@@ -17,11 +17,12 @@
 # under the License.
 from __future__ import annotations
 
+import contextlib
 import os
 import sys
 import tempfile
 from unittest import mock
-from unittest.mock import MagicMock, call, patch
+from unittest.mock import MagicMock, call
 
 import pytest
 
@@ -216,11 +217,20 @@ class TestUpdatedConfigNames:
         assert session_lifetime_config == default_timeout_minutes
 
 
-def test_sqlite_relative_path():
+@pytest.mark.parametrize(
+    ["value", "expectation"],
+    [
+        (
+            "sqlite:///./relative_path.db",
+            pytest.raises(AirflowConfigException, match=r"Cannot use relative path:"),
+        ),
+        # Should not raise an exception
+        ("sqlite://", contextlib.nullcontext()),
+    ],
+)
+def test_sqlite_relative_path(monkeypatch, value, expectation):
     from airflow import settings
 
-    with patch("airflow.settings.conf.get") as conf_get_mock:
-        conf_get_mock.return_value = "sqlite:///./relative_path.db"
-        with pytest.raises(AirflowConfigException) as exc:
-            settings.configure_vars()
-        assert "Cannot use relative path:" in str(exc.value)
+    monkeypatch.setattr(settings, "SQL_ALCHEMY_CONN", value)
+    with expectation:
+        settings.configure_orm()


(airflow) 01/26: fix copy-bug (#34904)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a970d1af038a609de5dbe11bf77f0e45d9112edd
Author: Akash Sharma <35...@users.noreply.github.com>
AuthorDate: Wed Jan 17 03:40:41 2024 +0530

    fix copy-bug (#34904)
    
    Co-authored-by: adaverse <ad...@Akash>
    (cherry picked from commit 3a48d0c47f439b684511e47340fd24cc3188c4e7)
---
 airflow/www/static/js/components/Clipboard.tsx | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/airflow/www/static/js/components/Clipboard.tsx b/airflow/www/static/js/components/Clipboard.tsx
index 0e0a5d451b..11b2547c1e 100644
--- a/airflow/www/static/js/components/Clipboard.tsx
+++ b/airflow/www/static/js/components/Clipboard.tsx
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-import React from "react";
+import React, { useEffect } from "react";
 import {
   Button,
   IconButton,
@@ -43,9 +43,13 @@ export const ClipboardButton = forwardRef(
     },
     ref
   ) => {
-    const { hasCopied, onCopy } = useClipboard(value);
+    const { setValue, hasCopied, onCopy } = useClipboard(value);
     const containerRef = useContainerRef();
 
+    useEffect(() => {
+      setValue(value);
+    }, [value, setValue]);
+
     const commonProps = {
       onClick: onCopy,
       variant,


(airflow) 03/26: REST API set description on POST to `/variables` endpoint (#36820)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1651ee74bdb3ff37edaa05059346bc95f417c273
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Wed Jan 17 02:35:48 2024 +0400

    REST API set description on POST to `/variables` endpoint (#36820)
    
    (cherry picked from commit 6d5d3555cbd22a3223e6fc14888442ad5b588e2e)
---
 airflow/api_connexion/endpoints/variable_endpoint.py |  3 +--
 .../endpoints/test_variable_endpoint.py              | 20 ++++++++++++++------
 2 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/airflow/api_connexion/endpoints/variable_endpoint.py b/airflow/api_connexion/endpoints/variable_endpoint.py
index d7ee84c793..a59aa2f4c1 100644
--- a/airflow/api_connexion/endpoints/variable_endpoint.py
+++ b/airflow/api_connexion/endpoints/variable_endpoint.py
@@ -137,8 +137,7 @@ def post_variables() -> Response:
     """Create a variable."""
     try:
         data = variable_schema.load(get_json_request_dict())
-
     except ValidationError as err:
         raise BadRequest("Invalid Variable schema", detail=str(err.messages))
-    Variable.set(data["key"], data["val"])
+    Variable.set(data["key"], data["val"], description=data.get("description", None))
     return variable_schema.dump(data)
diff --git a/tests/api_connexion/endpoints/test_variable_endpoint.py b/tests/api_connexion/endpoints/test_variable_endpoint.py
index 3b75254774..ab9dddf454 100644
--- a/tests/api_connexion/endpoints/test_variable_endpoint.py
+++ b/tests/api_connexion/endpoints/test_variable_endpoint.py
@@ -335,13 +335,21 @@ class TestPatchVariable(TestVariableEndpoint):
 
 
 class TestPostVariables(TestVariableEndpoint):
-    def test_should_create_variable(self, session):
+    @pytest.mark.parametrize(
+        "description",
+        [
+            pytest.param(None, id="not-set"),
+            pytest.param("", id="empty"),
+            pytest.param("Spam Egg", id="desc-set"),
+        ],
+    )
+    def test_should_create_variable(self, description, session):
+        payload = {"key": "var_create", "value": "{}"}
+        if description is not None:
+            payload["description"] = description
         response = self.client.post(
             "/api/v1/variables",
-            json={
-                "key": "var_create",
-                "value": "{}",
-            },
+            json=payload,
             environ_overrides={"REMOTE_USER": "test"},
         )
         assert response.status_code == 200
@@ -350,7 +358,7 @@ class TestPostVariables(TestVariableEndpoint):
         assert response.json == {
             "key": "var_create",
             "value": "{}",
-            "description": None,
+            "description": description,
         }
 
     def test_should_reject_invalid_request(self, session):


(airflow) 06/26: typo fix (#36900)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit afd47ae993444ba00319f4fb9149fa531b50f0fb
Author: Tamara Janina Fingerlin <90...@users.noreply.github.com>
AuthorDate: Fri Jan 19 14:16:05 2024 +0100

    typo fix (#36900)
    
    (cherry picked from commit 03c2bb56c0e71d5fa49292a27e32438a5e0e4567)
---
 docs/apache-airflow/core-concepts/objectstorage.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/apache-airflow/core-concepts/objectstorage.rst b/docs/apache-airflow/core-concepts/objectstorage.rst
index f5b113a861..0ec3bf8a34 100644
--- a/docs/apache-airflow/core-concepts/objectstorage.rst
+++ b/docs/apache-airflow/core-concepts/objectstorage.rst
@@ -33,7 +33,7 @@ of object-name => data. To enable remote access, operations on objects are usual
 
 Airflow provides a generic abstraction on top of object stores, like s3, gcs, and azure blob storage.
 This abstraction allows you to use a variety of object storage systems in your DAGs without having to
-change you code to deal with every different object storage system. In addition, it allows you to use
+change your code to deal with every different object storage system. In addition, it allows you to use
 most of the standard Python modules, like ``shutil``, that can work with file-like objects.
 
 Support for a particular object storage system depends on the providers you have installed. For


(airflow) 22/26: Adjust graph node layout (#37207)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2817c244338d4becd01223827a36df09ae5b39c5
Author: Brent Bovenzi <br...@astronomer.io>
AuthorDate: Tue Feb 6 17:24:49 2024 -0500

    Adjust graph node layout (#37207)
    
    (cherry picked from commit 7ea7abc75d9bb8349e95fc330b43657950837d30)
---
 airflow/www/static/js/components/Graph/Edge.tsx    |  6 +-
 .../www/static/js/dag/details/graph/Node.test.tsx  |  1 +
 airflow/www/static/js/dag/details/graph/Node.tsx   | 79 ++++++++++++++--------
 airflow/www/static/js/dag/details/graph/index.tsx  | 14 ++++
 airflow/www/static/js/dag/details/graph/utils.ts   |  8 +++
 airflow/www/static/js/types/index.ts               |  1 +
 6 files changed, 80 insertions(+), 29 deletions(-)

diff --git a/airflow/www/static/js/components/Graph/Edge.tsx b/airflow/www/static/js/components/Graph/Edge.tsx
index 3d9e144795..99b6259055 100644
--- a/airflow/www/static/js/components/Graph/Edge.tsx
+++ b/airflow/www/static/js/components/Graph/Edge.tsx
@@ -32,6 +32,10 @@ const CustomEdge = ({ data }: EdgeProps) => {
   const { colors } = useTheme();
   if (!data) return null;
   const { rest } = data;
+  let strokeWidth = 2;
+  if (rest.isSelected) strokeWidth = 3;
+  if (rest.isZoomedOut) strokeWidth = 5;
+  if (rest.isZoomedOut && rest.isSelected) strokeWidth = 7;
   return (
     <>
       {rest?.labels?.map(({ id, x, y, text, width, height }) => {
@@ -48,7 +52,7 @@ const CustomEdge = ({ data }: EdgeProps) => {
         <LinePath
           key={s.id}
           stroke={rest.isSelected ? colors.blue[400] : colors.gray[400]}
-          strokeWidth={rest.isSelected ? 3 : 2}
+          strokeWidth={strokeWidth}
           x={(d) => d.x || 0}
           y={(d) => d.y || 0}
           data={[s.startPoint, ...(s.bendPoints || []), s.endPoint]}
diff --git a/airflow/www/static/js/dag/details/graph/Node.test.tsx b/airflow/www/static/js/dag/details/graph/Node.test.tsx
index 6e1a4bca13..251f3c6aeb 100644
--- a/airflow/www/static/js/dag/details/graph/Node.test.tsx
+++ b/airflow/www/static/js/dag/details/graph/Node.test.tsx
@@ -52,6 +52,7 @@ const mockNode: NodeProps<CustomNodeProps> = {
     latestDagRunId: "run_id",
     onToggleCollapse: () => {},
     isActive: true,
+    isZoomedOut: false,
   },
   selected: false,
   zIndex: 0,
diff --git a/airflow/www/static/js/dag/details/graph/Node.tsx b/airflow/www/static/js/dag/details/graph/Node.tsx
index 6c72f041ac..7be5ae7b88 100644
--- a/airflow/www/static/js/dag/details/graph/Node.tsx
+++ b/airflow/www/static/js/dag/details/graph/Node.tsx
@@ -46,6 +46,7 @@ export interface CustomNodeProps {
   setupTeardownType?: "setup" | "teardown";
   labelStyle?: string;
   style?: string;
+  isZoomedOut: boolean;
 }
 
 export const BaseNode = ({
@@ -65,6 +66,7 @@ export const BaseNode = ({
     setupTeardownType,
     labelStyle,
     style,
+    isZoomedOut,
   },
 }: NodeProps<CustomNodeProps>) => {
   const { colors } = useTheme();
@@ -92,6 +94,8 @@ export const BaseNode = ({
   if (labelStyle) {
     [, operatorTextColor] = labelStyle.split(":");
   }
+  if (!operatorTextColor || operatorTextColor === "#000;")
+    operatorTextColor = "gray.500";
 
   const nodeBorderColor =
     instance?.state && stateColors[instance.state]
@@ -111,10 +115,15 @@ export const BaseNode = ({
       openDelay={hoverDelay}
     >
       <Box
-        borderRadius={5}
-        borderWidth={isSelected ? 2.5 : 1.5}
+        borderRadius={isZoomedOut ? 10 : 5}
+        borderWidth={(isSelected ? 4 : 2) * (isZoomedOut ? 3 : 1)}
         borderColor={nodeBorderColor}
-        bg={isSelected ? "blue.50" : bg}
+        bg={
+          !task.children?.length && operatorBG
+            ? // Fade the operator color to clash less with the task instance status
+              `color-mix(in srgb, ${operatorBG.replace(";", "")} 80%, white)`
+            : bg
+        }
         height={`${height}px`}
         width={`${width}px`}
         cursor={latestDagRunId ? "cursor" : "default"}
@@ -134,13 +143,16 @@ export const BaseNode = ({
           justifyContent="space-between"
           width={width}
           p={2}
-          flexWrap="wrap"
+          flexWrap={isZoomedOut ? "nowrap" : "wrap"}
+          flexDirection={isZoomedOut ? "row" : "column"}
         >
           <Flex flexDirection="column" width="100%">
             <Flex
               justifyContent="space-between"
               alignItems="center"
               width="100%"
+              fontSize={isZoomedOut ? 25 : undefined}
+              fontWeight="bold"
             >
               <Text noOfLines={1} maxWidth={`calc(${width}px - 8px)`}>
                 {taskName}
@@ -152,37 +164,48 @@ export const BaseNode = ({
                 <ImArrowDownRight2 size={15} color={colors.gray[800]} />
               )}
             </Flex>
-            {!!instance && instance.state && (
-              <Flex alignItems="center">
-                <SimpleStatus state={instance.state} />
-                <Text ml={2} color="gray.500" fontSize="lg">
-                  {instance.state}
-                </Text>
-              </Flex>
-            )}
-            {task?.operator && (
-              <Text
-                noOfLines={1}
-                maxWidth={`calc(${width}px - 12px)`}
-                fontWeight={400}
-                fontSize="md"
-                width="fit-content"
-                borderRadius={5}
-                bg={operatorBG}
-                color={operatorTextColor || "gray.500"}
-                px={1}
-              >
-                {task.operator}
-              </Text>
+            {!isZoomedOut && (
+              <>
+                {!!instance && instance.state && (
+                  <Flex alignItems="center">
+                    <SimpleStatus state={instance.state} />
+                    <Text
+                      ml={2}
+                      color="gray.500"
+                      fontWeight={400}
+                      fontSize="md"
+                    >
+                      {instance.state}
+                    </Text>
+                  </Flex>
+                )}
+                {task?.operator && (
+                  <Text
+                    noOfLines={1}
+                    maxWidth={`calc(${width}px - 12px)`}
+                    fontWeight={400}
+                    fontSize="md"
+                    width="fit-content"
+                    color={operatorTextColor}
+                    px={1}
+                  >
+                    {task.operator}
+                  </Text>
+                )}
+              </>
             )}
           </Flex>
           {!!childCount && (
             <Text
+              noOfLines={1}
+              fontSize={isZoomedOut ? 25 : undefined}
               color="blue.600"
               cursor="pointer"
+              width="150px"
+              fontWeight="bold"
               // Increase the target area to expand/collapse a group
-              p={3}
-              m={-3}
+              p={2}
+              m={-2}
               onClick={(e) => {
                 e.stopPropagation();
                 onToggleCollapse();
diff --git a/airflow/www/static/js/dag/details/graph/index.tsx b/airflow/www/static/js/dag/details/graph/index.tsx
index 29e07d65aa..4fb3d21f6c 100644
--- a/airflow/www/static/js/dag/details/graph/index.tsx
+++ b/airflow/www/static/js/dag/details/graph/index.tsx
@@ -26,6 +26,8 @@ import ReactFlow, {
   MiniMap,
   useReactFlow,
   Panel,
+  useOnViewportChange,
+  Viewport,
 } from "reactflow";
 
 import { useGraphData, useGridData } from "src/api";
@@ -51,6 +53,7 @@ const Graph = ({ openGroupIds, onToggleGroups, hoveredTaskState }: Props) => {
   const { data } = useGraphData();
   const [arrange, setArrange] = useState(data?.arrange || "LR");
   const [hasRendered, setHasRendered] = useState(false);
+  const [isZoomedOut, setIsZoomedOut] = useState(false);
 
   useEffect(() => {
     setArrange(data?.arrange || "LR");
@@ -71,6 +74,13 @@ const Graph = ({ openGroupIds, onToggleGroups, hoveredTaskState }: Props) => {
   const latestDagRunId = dagRuns[dagRuns.length - 1]?.runId;
   const offsetTop = useOffsetTop(graphRef);
 
+  useOnViewportChange({
+    onEnd: (viewport: Viewport) => {
+      if (viewport.zoom < 0.5 && !isZoomedOut) setIsZoomedOut(true);
+      if (viewport.zoom >= 0.5 && isZoomedOut) setIsZoomedOut(false);
+    },
+  });
+
   const { nodes, edges: nodeEdges } = useMemo(
     () =>
       flattenNodes({
@@ -81,6 +91,7 @@ const Graph = ({ openGroupIds, onToggleGroups, hoveredTaskState }: Props) => {
         latestDagRunId,
         groups,
         hoveredTaskState,
+        isZoomedOut,
       }),
     [
       graphData?.children,
@@ -90,6 +101,7 @@ const Graph = ({ openGroupIds, onToggleGroups, hoveredTaskState }: Props) => {
       latestDagRunId,
       groups,
       hoveredTaskState,
+      isZoomedOut,
     ]
   );
 
@@ -97,6 +109,7 @@ const Graph = ({ openGroupIds, onToggleGroups, hoveredTaskState }: Props) => {
   useEffect(() => {
     if (hasRendered) {
       const zoom = getZoom();
+      if (zoom < 0.5) setIsZoomedOut(true);
       fitView({
         duration: 750,
         nodes: selected.taskId ? [{ id: selected.taskId }] : undefined,
@@ -116,6 +129,7 @@ const Graph = ({ openGroupIds, onToggleGroups, hoveredTaskState }: Props) => {
     edges: flatEdges,
     nodes,
     selectedTaskId: selected.taskId,
+    isZoomedOut,
   });
 
   return (
diff --git a/airflow/www/static/js/dag/details/graph/utils.ts b/airflow/www/static/js/dag/details/graph/utils.ts
index ac940d46c7..901b47ab18 100644
--- a/airflow/www/static/js/dag/details/graph/utils.ts
+++ b/airflow/www/static/js/dag/details/graph/utils.ts
@@ -36,6 +36,7 @@ interface FlattenNodesProps {
   openGroupIds: string[];
   onToggleGroups: (groupIds: string[]) => void;
   hoveredTaskState?: string | null;
+  isZoomedOut: boolean;
 }
 
 // Generate a flattened list of nodes for react-flow to render
@@ -48,6 +49,7 @@ export const flattenNodes = ({
   openGroupIds,
   parent,
   hoveredTaskState,
+  isZoomedOut,
 }: FlattenNodesProps) => {
   let nodes: ReactFlowNode<CustomNodeProps>[] = [];
   let edges: ElkExtendedEdge[] = [];
@@ -76,6 +78,7 @@ export const flattenNodes = ({
         isSelected,
         latestDagRunId,
         isActive,
+        isZoomedOut,
         onToggleCollapse: () => {
           let newGroupIds = [];
           if (!node.value.isOpen) {
@@ -115,6 +118,7 @@ export const flattenNodes = ({
         openGroupIds,
         parent: newNode,
         hoveredTaskState,
+        isZoomedOut,
       });
       nodes = [...nodes, ...childNodes];
       edges = [...edges, ...childEdges];
@@ -153,6 +157,7 @@ interface BuildEdgesProps {
   edges?: Edge[];
   nodes: ReactFlowNode<CustomNodeProps>[];
   selectedTaskId?: string | null;
+  isZoomedOut?: boolean;
 }
 
 // Format edge data to what react-flow needs to render successfully
@@ -160,6 +165,7 @@ export const buildEdges = ({
   edges = [],
   nodes,
   selectedTaskId,
+  isZoomedOut,
 }: BuildEdgesProps) =>
   edges
     .map((edge) => ({
@@ -184,6 +190,7 @@ export const buildEdges = ({
           ...e,
           data: {
             rest: {
+              isZoomedOut,
               ...e.data.rest,
               labels: e.data.rest.labels?.map((l) =>
                 l.x && l.y ? { ...l, x: l.x + parentX, y: l.y + parentY } : l
@@ -215,6 +222,7 @@ export const buildEdges = ({
           rest: {
             ...e.data.rest,
             isSelected,
+            isZoomedOut,
           },
         },
       };
diff --git a/airflow/www/static/js/types/index.ts b/airflow/www/static/js/types/index.ts
index a75c1ace23..b9ada90370 100644
--- a/airflow/www/static/js/types/index.ts
+++ b/airflow/www/static/js/types/index.ts
@@ -165,6 +165,7 @@ export interface EdgeData {
     layoutOptions?: LayoutOptions;
     isSetupTeardown?: boolean;
     parentNode?: string;
+    isZoomedOut?: boolean;
   };
 }
 


(airflow) 21/26: Add airflow version substitution into Docker Compose Howto (#37177)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 50f3642bb7d12f1078abf59d65d0dbd540b2bd20
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Mon Feb 5 18:11:30 2024 +0400

    Add airflow version substitution into Docker Compose Howto (#37177)
    
    (cherry picked from commit 8fb4c71857324fa2366179819830d53f32ef5a3f)
---
 docs/apache-airflow/howto/docker-compose/index.rst | 4 ++--
 docs/conf.py                                       | 1 +
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/docs/apache-airflow/howto/docker-compose/index.rst b/docs/apache-airflow/howto/docker-compose/index.rst
index 1533d0c813..087b480993 100644
--- a/docs/apache-airflow/howto/docker-compose/index.rst
+++ b/docs/apache-airflow/howto/docker-compose/index.rst
@@ -319,14 +319,14 @@ you should do those steps:
 
 .. code-block:: docker
 
-    #image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.6.1}
+    #image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:|version|}
     build: .
 
 2) Create ``Dockerfile`` in the same folder your ``docker-compose.yaml`` file is with content similar to:
 
 .. code-block:: docker
 
-    FROM apache/airflow:2.6.1
+    FROM apache/airflow:|version|
     ADD requirements.txt .
     RUN pip install apache-airflow==${AIRFLOW_VERSION} -r requirements.txt
 
diff --git a/docs/conf.py b/docs/conf.py
index 4f7aaff91d..d33e928d4c 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -341,6 +341,7 @@ if PACKAGE_NAME == "apache-airflow":
         "installation/installing-from-pypi.html",
         "installation/installing-from-sources.html",
         "administration-and-deployment/logging-monitoring/advanced-logging-configuration.html",
+        "howto/docker-compose/index.html",
     ]
 if PACKAGE_NAME.startswith("apache-airflow-providers"):
     manual_substitutions_in_generated_html = ["example-dags.html", "operators.html", "index.html"]


(airflow) 20/26: Revert the sequence of initializing configuration defaults (#37155)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5919444b10ad3f1927d961197089d842d1f93b0d
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Sat Feb 3 15:43:38 2024 +0100

    Revert the sequence of initializing configuration defaults (#37155)
    
    (cherry picked from commit d987bce144ac8e42bd269e1971d0ad4f15dcff95)
---
 airflow/configuration.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/airflow/configuration.py b/airflow/configuration.py
index e4e3dab4b3..f9a5f608eb 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -1985,6 +1985,8 @@ def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
             FERNET_KEY = _generate_fernet_key()
             conf.remove_option("core", "fernet_key")
             conf.set("core", "fernet_key", FERNET_KEY)
+        pathlib.Path(airflow_config.__fspath__()).touch()
+        make_group_other_inaccessible(airflow_config.__fspath__())
         with open(airflow_config, "w") as file:
             conf.write(
                 file,
@@ -1994,7 +1996,6 @@ def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
                 extra_spacing=True,
                 only_defaults=True,
             )
-        make_group_other_inaccessible(airflow_config.__fspath__())
     return conf
 
 


(airflow) 09/26: Optimize max_execution_date query in single dag case (#33242)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 07ff8b059b3ebebf07d3c8b28100af5494b1512b
Author: Josh Owen <jo...@flourish.com>
AuthorDate: Mon Jan 22 11:08:25 2024 -0500

    Optimize max_execution_date query in single dag case (#33242)
    
    We can make better use of an index when we're only dealing with one dag, which is a common case.
    
    ---------
    
    Co-authored-by: Elad Kalif <45...@users.noreply.github.com>
    Co-authored-by: Daniel Standish <15...@users.noreply.github.com>
    (cherry picked from commit 10c04a4efcee6b92f8057a5a7dbd21d9ca2de710)
---
 airflow/models/dag.py    | 69 +++++++++++++++++++++++++++-----------
 tests/models/test_dag.py | 86 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 136 insertions(+), 19 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 8d0aef0d5d..492e66265b 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -73,7 +73,7 @@ from sqlalchemy import (
     update,
 )
 from sqlalchemy.ext.associationproxy import association_proxy
-from sqlalchemy.orm import backref, joinedload, relationship
+from sqlalchemy.orm import backref, joinedload, load_only, relationship
 from sqlalchemy.sql import Select, expression
 
 import airflow.templates
@@ -3059,27 +3059,13 @@ class DAG(LoggingMixin):
             session.add(orm_dag)
             orm_dags.append(orm_dag)
 
-        dag_id_to_last_automated_run: dict[str, DagRun] = {}
+        latest_runs: dict[str, DagRun] = {}
         num_active_runs: dict[str, int] = {}
         # Skip these queries entirely if no DAGs can be scheduled to save time.
         if any(dag.timetable.can_be_scheduled for dag in dags):
             # Get the latest automated dag run for each existing dag as a single query (avoid n+1 query)
-            last_automated_runs_subq = (
-                select(DagRun.dag_id, func.max(DagRun.execution_date).label("max_execution_date"))
-                .where(
-                    DagRun.dag_id.in_(existing_dags),
-                    or_(DagRun.run_type == DagRunType.BACKFILL_JOB, DagRun.run_type == DagRunType.SCHEDULED),
-                )
-                .group_by(DagRun.dag_id)
-                .subquery()
-            )
-            last_automated_runs = session.scalars(
-                select(DagRun).where(
-                    DagRun.dag_id == last_automated_runs_subq.c.dag_id,
-                    DagRun.execution_date == last_automated_runs_subq.c.max_execution_date,
-                )
-            )
-            dag_id_to_last_automated_run = {run.dag_id: run for run in last_automated_runs}
+            query = cls._get_latest_runs_query(existing_dags, session)
+            latest_runs = {run.dag_id: run for run in session.scalars(query)}
 
             # Get number of active dagruns for all dags we are processing as a single query.
             num_active_runs = DagRun.active_runs_of_dags(dag_ids=existing_dags, session=session)
@@ -3113,7 +3099,7 @@ class DAG(LoggingMixin):
             orm_dag.timetable_description = dag.timetable.description
             orm_dag.processor_subdir = processor_subdir
 
-            last_automated_run: DagRun | None = dag_id_to_last_automated_run.get(dag.dag_id)
+            last_automated_run: DagRun | None = latest_runs.get(dag.dag_id)
             if last_automated_run is None:
                 last_automated_data_interval = None
             else:
@@ -3250,6 +3236,51 @@ class DAG(LoggingMixin):
         for dag in dags:
             cls.bulk_write_to_db(dag.subdags, processor_subdir=processor_subdir, session=session)
 
+    @classmethod
+    def _get_latest_runs_query(cls, dags, session) -> Query:
+        """
+        Query the database to retrieve the last automated run for each dag.
+
+        :param dags: dags to query
+        :param session: sqlalchemy session object
+        """
+        if len(dags) == 1:
+            # Index optimized fast path to avoid more complicated & slower groupby queryplan
+            existing_dag_id = list(dags)[0].dag_id
+            last_automated_runs_subq = (
+                select(func.max(DagRun.execution_date).label("max_execution_date"))
+                .where(
+                    DagRun.dag_id == existing_dag_id,
+                    DagRun.run_type.in_((DagRunType.BACKFILL_JOB, DagRunType.SCHEDULED)),
+                )
+                .subquery()
+            )
+            query = select(DagRun).where(
+                DagRun.dag_id == existing_dag_id, DagRun.execution_date == last_automated_runs_subq
+            )
+        else:
+            last_automated_runs_subq = (
+                select(DagRun.dag_id, func.max(DagRun.execution_date).label("max_execution_date"))
+                .where(
+                    DagRun.dag_id.in_(dags),
+                    DagRun.run_type.in_((DagRunType.BACKFILL_JOB, DagRunType.SCHEDULED)),
+                )
+                .group_by(DagRun.dag_id)
+                .subquery()
+            )
+            query = select(DagRun).where(
+                DagRun.dag_id == last_automated_runs_subq.c.dag_id,
+                DagRun.execution_date == last_automated_runs_subq.c.max_execution_date,
+            )
+        return query.options(
+            load_only(
+                DagRun.dag_id,
+                DagRun.execution_date,
+                DagRun.data_interval_start,
+                DagRun.data_interval_end,
+            )
+        )
+
     @provide_session
     def sync_to_db(self, processor_subdir: str | None = None, session=NEW_SESSION):
         """
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index f367b00abe..7c337ed965 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -952,6 +952,59 @@ class TestDag:
             for row in session.query(DagModel.last_parsed_time).all():
                 assert row[0] is not None
 
+    def test_bulk_write_to_db_single_dag(self):
+        """
+        Test bulk_write_to_db for a single dag using the index optimized query
+        """
+        clear_db_dags()
+        dags = [DAG(f"dag-bulk-sync-{i}", start_date=DEFAULT_DATE, tags=["test-dag"]) for i in range(1)]
+
+        with assert_queries_count(5):
+            DAG.bulk_write_to_db(dags)
+        with create_session() as session:
+            assert {"dag-bulk-sync-0"} == {row[0] for row in session.query(DagModel.dag_id).all()}
+            assert {
+                ("dag-bulk-sync-0", "test-dag"),
+            } == set(session.query(DagTag.dag_id, DagTag.name).all())
+
+            for row in session.query(DagModel.last_parsed_time).all():
+                assert row[0] is not None
+
+        # Re-sync should do fewer queries
+        with assert_queries_count(8):
+            DAG.bulk_write_to_db(dags)
+        with assert_queries_count(8):
+            DAG.bulk_write_to_db(dags)
+
+    def test_bulk_write_to_db_multiple_dags(self):
+        """
+        Test bulk_write_to_db for multiple dags which does not use the index optimized query
+        """
+        clear_db_dags()
+        dags = [DAG(f"dag-bulk-sync-{i}", start_date=DEFAULT_DATE, tags=["test-dag"]) for i in range(4)]
+
+        with assert_queries_count(5):
+            DAG.bulk_write_to_db(dags)
+        with create_session() as session:
+            assert {"dag-bulk-sync-0", "dag-bulk-sync-1", "dag-bulk-sync-2", "dag-bulk-sync-3"} == {
+                row[0] for row in session.query(DagModel.dag_id).all()
+            }
+            assert {
+                ("dag-bulk-sync-0", "test-dag"),
+                ("dag-bulk-sync-1", "test-dag"),
+                ("dag-bulk-sync-2", "test-dag"),
+                ("dag-bulk-sync-3", "test-dag"),
+            } == set(session.query(DagTag.dag_id, DagTag.name).all())
+
+            for row in session.query(DagModel.last_parsed_time).all():
+                assert row[0] is not None
+
+        # Re-sync should do fewer queries
+        with assert_queries_count(8):
+            DAG.bulk_write_to_db(dags)
+        with assert_queries_count(8):
+            DAG.bulk_write_to_db(dags)
+
     @pytest.mark.parametrize("interval", [None, "@daily"])
     def test_bulk_write_to_db_interval_save_runtime(self, interval):
         mock_active_runs_of_dags = mock.MagicMock(side_effect=DagRun.active_runs_of_dags)
@@ -4082,3 +4135,36 @@ class TestTaskClearingSetupTeardownBehavior:
                 Exception, match="Setup tasks must be followed with trigger rule ALL_SUCCESS."
             ):
                 dag.validate_setup_teardown()
+
+
+def test_get_latest_runs_query_one_dag(dag_maker, session):
+    with dag_maker(dag_id="dag1") as dag1:
+        ...
+    query = DAG._get_latest_runs_query(dags=[dag1], session=session)
+    actual = [x.strip() for x in str(query.compile()).splitlines()]
+    expected = [
+        "SELECT dag_run.id, dag_run.dag_id, dag_run.execution_date, dag_run.data_interval_start, dag_run.data_interval_end",
+        "FROM dag_run",
+        "WHERE dag_run.dag_id = :dag_id_1 AND dag_run.execution_date = (SELECT max(dag_run.execution_date) AS max_execution_date",
+        "FROM dag_run",
+        "WHERE dag_run.dag_id = :dag_id_2 AND dag_run.run_type IN (__[POSTCOMPILE_run_type_1]))",
+    ]
+    assert actual == expected
+
+
+def test_get_latest_runs_query_two_dags(dag_maker, session):
+    with dag_maker(dag_id="dag1") as dag1:
+        ...
+    with dag_maker(dag_id="dag2") as dag2:
+        ...
+    query = DAG._get_latest_runs_query(dags=[dag1, dag2], session=session)
+    actual = [x.strip() for x in str(query.compile()).splitlines()]
+    print("\n".join(actual))
+    expected = [
+        "SELECT dag_run.id, dag_run.dag_id, dag_run.execution_date, dag_run.data_interval_start, dag_run.data_interval_end",
+        "FROM dag_run, (SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS max_execution_date",
+        "FROM dag_run",
+        "WHERE dag_run.dag_id IN (__[POSTCOMPILE_dag_id_1]) AND dag_run.run_type IN (__[POSTCOMPILE_run_type_1]) GROUP BY dag_run.dag_id) AS anon_1",
+        "WHERE dag_run.dag_id = anon_1.dag_id AND dag_run.execution_date = anon_1.max_execution_date",
+    ]
+    assert actual == expected


(airflow) 18/26: Add clarification about DAG author capabilities to security model (#37141)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c60bd0b5df0b5f7cd5f6885e1faed44e25cd6d99
Author: Amogh Desai <am...@gmail.com>
AuthorDate: Fri Feb 2 12:58:53 2024 +0530

    Add clarification about DAG author capabilities to security model (#37141)
    
    (cherry picked from commit e1b032d05b6fba0766a7d81f8af6bc2e2cc3d0d6)
---
 docs/apache-airflow/security/security_model.rst | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/docs/apache-airflow/security/security_model.rst b/docs/apache-airflow/security/security_model.rst
index 6200fff8f0..62a614110b 100644
--- a/docs/apache-airflow/security/security_model.rst
+++ b/docs/apache-airflow/security/security_model.rst
@@ -209,6 +209,12 @@ The Deployment Manager might decide to introduce additional control mechanisms t
 executing arbitrary code. This is all fully in hands of the Deployment Manager and it is discussed in the
 following chapter.
 
+Access to All DAGs
+........................................................................
+
+All DAG authors have access to all DAGs in the airflow deployment. This means that they can view, modify,
+and update any DAG without restrictions at any time.
+
 Responsibilities of Deployment Managers
 ---------------------------------------
 


(airflow) 16/26: Move docs for cron basics to Authoring and Scheduling section (#37049)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a2c9a0a8db5c2e7ac514fcabfc127fb080153e27
Author: Keto D. Zhang <ke...@gmail.com>
AuthorDate: Wed Jan 31 04:01:35 2024 -0800

    Move docs for cron basics to Authoring and Scheduling section (#37049)
    
    (cherry picked from commit 4ca8d4eda2552ef9ae49c5372d5dba96b56e7a17)
---
 .../authoring-and-scheduling/cron.rst              | 70 ++++++++++++++++++++++
 .../authoring-and-scheduling/index.rst             |  1 +
 docs/apache-airflow/core-concepts/dag-run.rst      | 36 -----------
 docs/apache-airflow/core-concepts/dags.rst         | 19 ++++--
 4 files changed, 84 insertions(+), 42 deletions(-)

diff --git a/docs/apache-airflow/authoring-and-scheduling/cron.rst b/docs/apache-airflow/authoring-and-scheduling/cron.rst
new file mode 100644
index 0000000000..364c4608ec
--- /dev/null
+++ b/docs/apache-airflow/authoring-and-scheduling/cron.rst
@@ -0,0 +1,70 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Cron & Time Intervals
+======================
+You may set your DAG to run on a simple schedule by setting its ``schedule`` argument to either a
+`cron expression <https://en.wikipedia.org/wiki/Cron#CRON_expression>`_, a ``datetime.timedelta`` object,
+or one of the :ref:`cron-presets`.
+
+.. code-block:: python
+
+    from airflow.models.dag import DAG
+
+    import datetime
+
+    dag = DAG("regular_interval_cron_example", schedule="0 0 * * *", ...)
+
+    dag = DAG("regular_interval_cron_preset_example", schedule="@daily", ...)
+
+    dag = DAG("regular_interval_timedelta_example", schedule=datetime.timedelta(days=1), ...)
+.. _cron-presets:
+
+Cron Presets
+''''''''''''
+For more elaborate scheduling requirements, you can implement a :doc:`custom timetable <../authoring-and-scheduling/timetable>`.
+Note that Airflow parses cron expressions with the croniter library which supports an extended syntax for cron strings. See their documentation `in github <https://github.com/kiorky/croniter>`_.
+For example, you can create a DAG schedule to run at 12AM on the first Monday of the month with their extended cron syntax: ``0 0 * * MON#1``.
+
+.. tip::
+    You can use an online editor for CRON expressions such as `Crontab guru <https://crontab.guru/>`_
+
++----------------+--------------------------------------------------------------------+-----------------+
+| preset         | meaning                                                            | cron            |
++================+====================================================================+=================+
+| ``None``       | Don't schedule, use for exclusively "externally triggered" DAGs    |                 |
++----------------+--------------------------------------------------------------------+-----------------+
+| ``@once``      | Schedule once and only once                                        |                 |
++----------------+--------------------------------------------------------------------+-----------------+
+| ``@continuous``| Run as soon as the previous run finishes                           |                 |
++----------------+--------------------------------------------------------------------+-----------------+
+| ``@hourly``    | Run once an hour at the end of the hour                            | ``0 * * * *``   |
++----------------+--------------------------------------------------------------------+-----------------+
+| ``@daily``     | Run once a day at midnight (24:00)                                 | ``0 0 * * *``   |
++----------------+--------------------------------------------------------------------+-----------------+
+| ``@weekly``    | Run once a week at midnight (24:00) on Sunday                      | ``0 0 * * 0``   |
++----------------+--------------------------------------------------------------------+-----------------+
+| ``@monthly``   | Run once a month at midnight (24:00) of the first day of the month | ``0 0 1 * *``   |
++----------------+--------------------------------------------------------------------+-----------------+
+| ``@quarterly`` | Run once a quarter at midnight (24:00) on the first day            | ``0 0 1 */3 *`` |
++----------------+--------------------------------------------------------------------+-----------------+
+| ``@yearly``    | Run once a year at midnight (24:00) of January 1                   | ``0 0 1 1 *``   |
++----------------+--------------------------------------------------------------------+-----------------+
+
+Your DAG will be instantiated for each schedule along with a corresponding
+DAG Run entry in the database backend.
diff --git a/docs/apache-airflow/authoring-and-scheduling/index.rst b/docs/apache-airflow/authoring-and-scheduling/index.rst
index db1ef3726c..1a042918fc 100644
--- a/docs/apache-airflow/authoring-and-scheduling/index.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/index.rst
@@ -39,6 +39,7 @@ It's recommended that you first review the pages in :doc:`core concepts </core-c
 .. toctree::
     :maxdepth: 2
 
+    cron
     timezone
     datasets
     timetable
diff --git a/docs/apache-airflow/core-concepts/dag-run.rst b/docs/apache-airflow/core-concepts/dag-run.rst
index e477561e0f..d272a1cd04 100644
--- a/docs/apache-airflow/core-concepts/dag-run.rst
+++ b/docs/apache-airflow/core-concepts/dag-run.rst
@@ -44,42 +44,6 @@ There are two possible terminal states for the DAG Run:
 
 DAGs that have a currently running DAG run can be shown on the UI dashboard in the "Running" tab. Similarly, DAGs whose latest DAG run is marked as failed can be found on the "Failed" tab.
 
-Cron Presets
-''''''''''''
-
-You may set your DAG to run on a simple schedule by setting its ``schedule`` argument to either a
-`cron expression <https://en.wikipedia.org/wiki/Cron#CRON_expression>`_, a ``datetime.timedelta`` object,
-or one of the following cron "presets". For more elaborate scheduling requirements, you can implement a :doc:`custom timetable <../authoring-and-scheduling/timetable>`. Note that Airflow parses cron expressions with the croniter library which supports an extended syntax for cron strings. See their documentation `in github <https://github.com/kiorky/croniter>`_. For example, you can create a DAG schedule to run at 12AM on the first Monday of the month with their extended cron syntax: ``0  [...]
-
-.. tip::
-    You can use an online editor for CRON expressions such as `Crontab guru <https://crontab.guru/>`_
-
-+----------------+--------------------------------------------------------------------+-----------------+
-| preset         | meaning                                                            | cron            |
-+================+====================================================================+=================+
-| ``None``       | Don't schedule, use for exclusively "externally triggered" DAGs    |                 |
-+----------------+--------------------------------------------------------------------+-----------------+
-| ``@once``      | Schedule once and only once                                        |                 |
-+----------------+--------------------------------------------------------------------+-----------------+
-| ``@continuous``| Run as soon as the previous run finishes                           |                 |
-+----------------+--------------------------------------------------------------------+-----------------+
-| ``@hourly``    | Run once an hour at the end of the hour                            | ``0 * * * *``   |
-+----------------+--------------------------------------------------------------------+-----------------+
-| ``@daily``     | Run once a day at midnight (24:00)                                 | ``0 0 * * *``   |
-+----------------+--------------------------------------------------------------------+-----------------+
-| ``@weekly``    | Run once a week at midnight (24:00) on Sunday                      | ``0 0 * * 0``   |
-+----------------+--------------------------------------------------------------------+-----------------+
-| ``@monthly``   | Run once a month at midnight (24:00) of the first day of the month | ``0 0 1 * *``   |
-+----------------+--------------------------------------------------------------------+-----------------+
-| ``@quarterly`` | Run once a quarter at midnight (24:00) on the first day            | ``0 0 1 */3 *`` |
-+----------------+--------------------------------------------------------------------+-----------------+
-| ``@yearly``    | Run once a year at midnight (24:00) of January 1                   | ``0 0 1 1 *``   |
-+----------------+--------------------------------------------------------------------+-----------------+
-
-Your DAG will be instantiated for each schedule along with a corresponding
-DAG Run entry in the database backend.
-
-
 .. _data-interval:
 
 Data Interval
diff --git a/docs/apache-airflow/core-concepts/dags.rst b/docs/apache-airflow/core-concepts/dags.rst
index 2a7829971e..a191f874a5 100644
--- a/docs/apache-airflow/core-concepts/dags.rst
+++ b/docs/apache-airflow/core-concepts/dags.rst
@@ -190,18 +190,20 @@ DAGs do not *require* a schedule, but it's very common to define one. You define
     with DAG("my_daily_dag", schedule="@daily"):
         ...
 
-The ``schedule`` argument takes any value that is a valid `Crontab <https://en.wikipedia.org/wiki/Cron>`_ schedule value, so you could also do::
+There are various valid values for the ``schedule`` argument::
 
     with DAG("my_daily_dag", schedule="0 0 * * *"):
         ...
 
-.. tip::
+    with DAG("my_one_time_dag", schedule="@once"):
+        ...
 
-    For more information on ``schedule`` values, see :doc:`DAG Run <dag-run>`.
+    with DAG("my_continuous_dag", schedule="@continuous"):
+        ...
 
-    If ``schedule`` is not enough to express the DAG's schedule, see :doc:`Timetables </howto/timetable>`.
-    For more information on ``logical date``, see :ref:`data-interval` and
-    :ref:`faq:what-does-execution-date-mean`.
+.. tip::
+
+    For more information different types of scheduling, see :doc:`/authoring-and-scheduling/index`.
 
 Every time you run a DAG, you are creating a new instance of that DAG which
 Airflow calls a :doc:`DAG Run <dag-run>`. DAG Runs can run in parallel for the
@@ -237,6 +239,11 @@ schedule interval put in place, the logical date is going to indicate the time
 at which it marks the start of the data interval, where the DAG run's start
 date would then be the logical date + scheduled interval.
 
+.. tip::
+
+    For more information on ``logical date``, see :ref:`data-interval` and
+    :ref:`faq:what-does-execution-date-mean`.
+
 DAG Assignment
 --------------
 


(airflow) 14/26: Secret masker ignores passwords with special chars (#36692)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 23d9092c29b8fd2c22c6d9acfba380c8cf591312
Author: Aritra Basu <24...@users.noreply.github.com>
AuthorDate: Mon Jan 29 17:53:45 2024 +0530

    Secret masker ignores passwords with special chars (#36692)
    
    * Secret masker ignores passwords with special chars #36688
    
    Connection uri's get connection uses quote to change the
    password and certain other fields to escape special chars
    due to this, when the connection object is passed through
    the masker this changed string is skipped.
    
    * Added a test for the logging change
    
    (cherry picked from commit e8538493a9f74f61834b19e840c7b9987269d45d)
---
 airflow/models/connection.py           |  2 ++
 tests/always/test_connection.py        | 13 ++++++++-----
 tests/utils/log/test_secrets_masker.py | 18 +++++++++++++++++-
 3 files changed, 27 insertions(+), 6 deletions(-)

diff --git a/airflow/models/connection.py b/airflow/models/connection.py
index 0af3f13768..d622a221de 100644
--- a/airflow/models/connection.py
+++ b/airflow/models/connection.py
@@ -173,6 +173,7 @@ class Connection(Base, LoggingMixin):
 
         if self.password:
             mask_secret(self.password)
+            mask_secret(quote(self.password))
 
     @staticmethod
     def _validate_extra(extra, conn_id) -> None:
@@ -206,6 +207,7 @@ class Connection(Base, LoggingMixin):
     def on_db_load(self):
         if self.password:
             mask_secret(self.password)
+            mask_secret(quote(self.password))
 
     def parse_from_uri(self, **uri):
         """Use uri parameter in constructor, this method is deprecated."""
diff --git a/tests/always/test_connection.py b/tests/always/test_connection.py
index e01907a8d3..ab2e26c6ad 100644
--- a/tests/always/test_connection.py
+++ b/tests/always/test_connection.py
@@ -22,6 +22,7 @@ import os
 import re
 from collections import namedtuple
 from unittest import mock
+from urllib.parse import quote
 
 import pytest
 import sqlalchemy
@@ -362,6 +363,7 @@ class TestConnection:
         expected_calls = []
         if test_config.test_conn_attributes.get("password"):
             expected_calls.append(mock.call(test_config.test_conn_attributes["password"]))
+            expected_calls.append(mock.call(quote(test_config.test_conn_attributes["password"])))
 
         if test_config.test_conn_attributes.get("extra_dejson"):
             expected_calls.append(mock.call(test_config.test_conn_attributes["extra_dejson"]))
@@ -592,7 +594,7 @@ class TestConnection:
     @mock.patch.dict(
         "os.environ",
         {
-            "AIRFLOW_CONN_TEST_URI": "postgresql://username:password@ec2.compute.com:5432/the_database",
+            "AIRFLOW_CONN_TEST_URI": "postgresql://username:password%21@ec2.compute.com:5432/the_database",
         },
     )
     def test_using_env_var(self):
@@ -600,10 +602,10 @@ class TestConnection:
         assert "ec2.compute.com" == conn.host
         assert "the_database" == conn.schema
         assert "username" == conn.login
-        assert "password" == conn.password
+        assert "password!" == conn.password
         assert 5432 == conn.port
 
-        self.mask_secret.assert_called_once_with("password")
+        self.mask_secret.assert_has_calls([mock.call("password!"), mock.call(quote("password!"))])
 
     @mock.patch.dict(
         "os.environ",
@@ -717,7 +719,7 @@ class TestConnection:
             conn = Connection(
                 conn_id=f"test-{os.getpid()}",
                 conn_type="http",
-                password="s3cr3t",
+                password="s3cr3t!",
                 extra='{"apikey":"masked too"}',
             )
             session.add(conn)
@@ -733,7 +735,8 @@ class TestConnection:
 
             assert self.mask_secret.mock_calls == [
                 # We should have called it _again_ when loading from the DB
-                mock.call("s3cr3t"),
+                mock.call("s3cr3t!"),
+                mock.call(quote("s3cr3t!")),
                 mock.call({"apikey": "masked too"}),
             ]
         finally:
diff --git a/tests/utils/log/test_secrets_masker.py b/tests/utils/log/test_secrets_masker.py
index 4657a7c1f3..8478d01d2a 100644
--- a/tests/utils/log/test_secrets_masker.py
+++ b/tests/utils/log/test_secrets_masker.py
@@ -30,6 +30,7 @@ from unittest.mock import patch
 import pytest
 
 from airflow import settings
+from airflow.models import Connection
 from airflow.utils.log.secrets_masker import (
     RedactedIO,
     SecretsMasker,
@@ -80,7 +81,6 @@ def logger(caplog):
     logger.addFilter(filt)
 
     filt.add_mask("password")
-
     return logger
 
 
@@ -340,6 +340,22 @@ class TestSecretsMasker:
         assert caplog.text == f"INFO State: {expected}\n"
         assert "TypeError" not in caplog.text
 
+    def test_masking_quoted_strings_in_connection(self, logger, caplog):
+        secrets_masker = [fltr for fltr in logger.filters if isinstance(fltr, SecretsMasker)][0]
+        with patch("airflow.utils.log.secrets_masker._secrets_masker", return_value=secrets_masker):
+            test_conn_attributes = dict(
+                conn_type="scheme",
+                host="host/location",
+                schema="schema",
+                login="user",
+                password="should_be_hidden!",
+                port=1234,
+                extra=None,
+            )
+            conn = Connection(**test_conn_attributes)
+            logger.info(conn.get_uri())
+            assert "should_be_hidden" not in caplog.text
+
 
 class TestShouldHideValueForKey:
     @pytest.mark.parametrize(


(airflow) 15/26: Bugfix Triggering DAG with parameters is mandatory when show_trigger_form_if_no_params is enabled (#37063)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c1a2c3b326f5b50ae95dcda72f81c997392d5da6
Author: Jens Scheffler <95...@users.noreply.github.com>
AuthorDate: Mon Jan 29 13:50:56 2024 +0100

    Bugfix Triggering DAG with parameters is mandatory when show_trigger_form_if_no_params is enabled (#37063)
    
    * Add a requested default configuration to form call to start w/o form display
    
    * Follow-up bugfix - ensure bad form values are transferred when key exists
    
    (cherry picked from commit 1197f2ff654e2fdafbb9d410a8d99cb6d6d3f8e6)
---
 airflow/www/templates/airflow/dag.html  | 1 +
 airflow/www/templates/airflow/dags.html | 1 +
 airflow/www/views.py                    | 3 ++-
 3 files changed, 4 insertions(+), 1 deletion(-)

diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index 8d39d05a40..8d77db0181 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -231,6 +231,7 @@
                   <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
                   <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
                   <input type="hidden" name="unpause" value="True">
+                  <input type="hidden" name="conf" value="{}">
                   <!-- for task instance detail pages, dag_id is still a query param -->
                   {% if 'dag_id' in request.args %}
                     <input type="hidden" name="origin" value="{{ url_for(request.endpoint, **request.args) }}">
diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html
index 93d03dfdcf..0acd68b5a1 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -377,6 +377,7 @@
                           <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
                           <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
                           <input type="hidden" name="unpause" value="True">
+                          <input type="hidden" name="conf" value="{}">
                           <button type="submit" class="dropdown-form-btn">Trigger DAG</button>
                         </form>
                       </li>
diff --git a/airflow/www/views.py b/airflow/www/views.py
index f2ea54e81a..4bf06d35c7 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2188,7 +2188,8 @@ class Airflow(AirflowBaseView):
             form = DateTimeForm(data={"execution_date": execution_date})
             # Take over "bad" submitted fields for new form display
             for k, v in form_fields.items():
-                form_fields[k]["value"] = run_conf[k]
+                if k in run_conf:
+                    form_fields[k]["value"] = run_conf[k]
             return self.render_template(
                 "airflow/trigger.html",
                 form_fields=form_fields,


(airflow) 02/26: Sanitize the conn_id to disallow potential script execution (#32867)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit dbdec0bf7a8c1072051a546677701bbeb3a81347
Author: andylamp <an...@users.noreply.github.com>
AuthorDate: Wed Jan 17 00:17:45 2024 +0200

    Sanitize the conn_id to disallow potential script execution (#32867)
    
    (cherry picked from commit 71f422c7bc217129ac8614f14e6aeb586c6c88da)
---
 airflow/models/connection.py    | 37 ++++++++++++++++++++++--
 airflow/www/forms.py            |  4 +--
 airflow/www/validators.py       | 28 +++++++++++++++++-
 tests/models/test_connection.py | 64 +++++++++++++++++++++++++++++++++++++++++
 4 files changed, 128 insertions(+), 5 deletions(-)

diff --git a/airflow/models/connection.py b/airflow/models/connection.py
index 4e8e3c7aaf..0af3f13768 100644
--- a/airflow/models/connection.py
+++ b/airflow/models/connection.py
@@ -24,6 +24,7 @@ from json import JSONDecodeError
 from typing import Any
 from urllib.parse import parse_qsl, quote, unquote, urlencode, urlsplit
 
+import re2
 from sqlalchemy import Boolean, Column, Integer, String, Text
 from sqlalchemy.orm import declared_attr, reconstructor, synonym
 
@@ -38,6 +39,13 @@ from airflow.utils.log.secrets_masker import mask_secret
 from airflow.utils.module_loading import import_string
 
 log = logging.getLogger(__name__)
+# sanitize the `conn_id` pattern by allowing alphanumeric characters plus
+# the symbols #,!,-,_,.,:,\,/ and () requiring at least one match.
+#
+# You can try the regex here: https://regex101.com/r/69033B/1
+RE_SANITIZE_CONN_ID = re2.compile(r"^[\w\#\!\(\)\-\.\:\/\\]{1,}$")
+# the conn ID max len should be 250
+CONN_ID_MAX_LEN: int = 250
 
 
 def parse_netloc_to_hostname(*args, **kwargs):
@@ -46,10 +54,35 @@ def parse_netloc_to_hostname(*args, **kwargs):
     return _parse_netloc_to_hostname(*args, **kwargs)
 
 
+def sanitize_conn_id(conn_id: str | None, max_length=CONN_ID_MAX_LEN) -> str | None:
+    r"""Sanitizes the connection id and allows only specific characters to be within.
+
+    Namely, it allows alphanumeric characters plus the symbols #,!,-,_,.,:,\,/ and () from 1 and up to
+    250 consecutive matches. If desired, the max length can be adjusted by setting `max_length`.
+
+    You can try to play with the regex here: https://regex101.com/r/69033B/1
+
+    The character selection is such that it prevents the injection of javascript or
+    executable bits to avoid any awkward behaviour in the front-end.
+
+    :param conn_id: The connection id to sanitize.
+    :param max_length: The max length of the connection ID, by default it is 250.
+    :return: the sanitized string, `None` otherwise.
+    """
+    # check if `conn_id` or our match group is `None` and the `conn_id` is within the specified length.
+    if (not isinstance(conn_id, str) or len(conn_id) > max_length) or (
+        res := re2.match(RE_SANITIZE_CONN_ID, conn_id)
+    ) is None:
+        return None
+
+    # if we reach here, then we matched something, return the first match
+    return res.group(0)
+
+
 # Python automatically converts all letters to lowercase in hostname
 # See: https://issues.apache.org/jira/browse/AIRFLOW-3615
 def _parse_netloc_to_hostname(uri_parts):
-    """Parse a URI string to get correct Hostname."""
+    """Parse a URI string to get the correct Hostname."""
     hostname = unquote(uri_parts.hostname or "")
     if "/" in hostname:
         hostname = uri_parts.netloc
@@ -115,7 +148,7 @@ class Connection(Base, LoggingMixin):
         uri: str | None = None,
     ):
         super().__init__()
-        self.conn_id = conn_id
+        self.conn_id = sanitize_conn_id(conn_id)
         self.description = description
         if extra and not isinstance(extra, str):
             extra = json.dumps(extra)
diff --git a/airflow/www/forms.py b/airflow/www/forms.py
index 8a8f69cf44..aa5d3a6249 100644
--- a/airflow/www/forms.py
+++ b/airflow/www/forms.py
@@ -41,7 +41,7 @@ from airflow.configuration import conf
 from airflow.providers_manager import ProvidersManager
 from airflow.utils import timezone
 from airflow.utils.types import DagRunType
-from airflow.www.validators import ReadOnly, ValidKey
+from airflow.www.validators import ReadOnly, ValidConnID
 from airflow.www.widgets import (
     AirflowDateTimePickerROWidget,
     AirflowDateTimePickerWidget,
@@ -221,7 +221,7 @@ def create_connection_form_class() -> type[DynamicForm]:
 
         conn_id = StringField(
             lazy_gettext("Connection Id"),
-            validators=[InputRequired(), ValidKey()],
+            validators=[InputRequired(), ValidConnID()],
             widget=BS3TextFieldWidget(),
         )
         conn_type = SelectField(
diff --git a/airflow/www/validators.py b/airflow/www/validators.py
index ce273308df..8deacb8763 100644
--- a/airflow/www/validators.py
+++ b/airflow/www/validators.py
@@ -22,6 +22,7 @@ from json import JSONDecodeError
 
 from wtforms.validators import EqualTo, ValidationError
 
+from airflow.models.connection import CONN_ID_MAX_LEN, sanitize_conn_id
 from airflow.utils import helpers
 
 
@@ -85,7 +86,7 @@ class ValidKey:
     Validates values that will be used as keys.
 
     :param max_length:
-        The maximum length of the given key
+        The maximum allowed length of the given key
     """
 
     def __init__(self, max_length=200):
@@ -108,3 +109,28 @@ class ReadOnly:
 
     def __call__(self, form, field):
         field.flags.readonly = True
+
+
+class ValidConnID:
+    """
+    Validates the connection ID adheres to the desired format.
+
+    :param max_length:
+        The maximum allowed length of the given Connection ID.
+    """
+
+    message = (
+        "Connection ID must be alphanumeric characters plus dashes, dots, hashes, colons, semicolons, "
+        "underscores, exclamation marks, and parentheses"
+    )
+
+    def __init__(
+        self,
+        max_length: int = CONN_ID_MAX_LEN,
+    ):
+        self.max_length = max_length
+
+    def __call__(self, form, field):
+        if field.data:
+            if sanitize_conn_id(field.data, self.max_length) is None:
+                raise ValidationError(f"{self.message} for 1 and up to {self.max_length} matches")
diff --git a/tests/models/test_connection.py b/tests/models/test_connection.py
index cac38c1451..21e5682c8d 100644
--- a/tests/models/test_connection.py
+++ b/tests/models/test_connection.py
@@ -186,3 +186,67 @@ class TestConnection:
     )
     def test_get_uri(self, connection, expected_uri):
         assert connection.get_uri() == expected_uri
+
+    @pytest.mark.parametrize(
+        "connection, expected_conn_id",
+        [
+            # a valid example of connection id
+            (
+                Connection(
+                    conn_id="12312312312213___12312321",
+                    conn_type="type",
+                    login="user",
+                    password="pass",
+                    host="host",
+                    port=100,
+                    schema="schema",
+                    extra={"param1": "val1", "param2": "val2"},
+                ),
+                "12312312312213___12312321",
+            ),
+            # an invalid example of connection id, which allows potential code execution
+            (
+                Connection(
+                    conn_id="<script>alert(1)</script>",
+                    conn_type="type",
+                    host="protocol://host",
+                    port=100,
+                    schema="schema",
+                    extra={"param1": "val1", "param2": "val2"},
+                ),
+                None,
+            ),
+            # a valid connection as well
+            (
+                Connection(
+                    conn_id="a_valid_conn_id_!!##",
+                    conn_type="type",
+                    login="user",
+                    password="pass",
+                    host="protocol://host",
+                    port=100,
+                    schema="schema",
+                    extra={"param1": "val1", "param2": "val2"},
+                ),
+                "a_valid_conn_id_!!##",
+            ),
+            # a valid connection as well testing dashes
+            (
+                Connection(
+                    conn_id="a_-.11",
+                    conn_type="type",
+                    login="user",
+                    password="pass",
+                    host="protocol://host",
+                    port=100,
+                    schema="schema",
+                    extra={"param1": "val1", "param2": "val2"},
+                ),
+                "a_-.11",
+            ),
+        ],
+    )
+    # Responsible for ensuring that the sanitized connection id
+    # string works as expected.
+    def test_sanitize_conn_id(self, connection, expected_conn_id):
+        assert connection.conn_id == expected_conn_id


(airflow) 23/26: Fix collapsed grid width, line up selected bar with gantt (#37205)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c0c5ab4b6fdfc107265a9dc11290d825db4b88e9
Author: Brent Bovenzi <br...@astronomer.io>
AuthorDate: Tue Feb 6 19:02:04 2024 -0500

    Fix collapsed grid width, line up selected bar with gantt (#37205)
    
    (cherry picked from commit 14360ea824a76799502c4f044f8e21019ee5e79a)
---
 airflow/www/static/js/dag/Main.tsx                |  2 +-
 airflow/www/static/js/dag/details/Header.tsx      |  2 +-
 airflow/www/static/js/dag/details/index.tsx       |  2 +-
 airflow/www/static/js/dag/grid/dagRuns/index.tsx  | 12 +++++++++++-
 airflow/www/static/js/dag/grid/index.tsx          |  3 +--
 airflow/www/static/js/dag/grid/renderTaskRows.tsx |  2 ++
 6 files changed, 17 insertions(+), 6 deletions(-)

diff --git a/airflow/www/static/js/dag/Main.tsx b/airflow/www/static/js/dag/Main.tsx
index 86955792ec..28dffedf28 100644
--- a/airflow/www/static/js/dag/Main.tsx
+++ b/airflow/www/static/js/dag/Main.tsx
@@ -38,7 +38,7 @@ import keyboardShortcutIdentifier from "./keyboardShortcutIdentifier";
 
 const detailsPanelKey = "hideDetailsPanel";
 const minPanelWidth = 300;
-const collapsedWidth = "28px";
+const collapsedWidth = "32px";
 
 const gridWidthKey = "grid-width";
 const saveWidth = debounce(
diff --git a/airflow/www/static/js/dag/details/Header.tsx b/airflow/www/static/js/dag/details/Header.tsx
index d17f04618a..5f0ded364c 100644
--- a/airflow/www/static/js/dag/details/Header.tsx
+++ b/airflow/www/static/js/dag/details/Header.tsx
@@ -87,7 +87,7 @@ const Header = () => {
   const isMappedTaskDetails = runId && taskId && mapIndex !== undefined;
 
   return (
-    <Breadcrumb separator={<Text color="gray.300">/</Text>}>
+    <Breadcrumb ml={3} separator={<Text color="gray.300">/</Text>}>
       <BreadcrumbItem isCurrentPage={isDagDetails} mt={4}>
         <BreadcrumbLink
           onClick={clearSelection}
diff --git a/airflow/www/static/js/dag/details/index.tsx b/airflow/www/static/js/dag/details/index.tsx
index ad2662d61e..918fa7c804 100644
--- a/airflow/www/static/js/dag/details/index.tsx
+++ b/airflow/www/static/js/dag/details/index.tsx
@@ -197,7 +197,7 @@ const Details = ({
       : group?.instances.find((ti) => ti.runId === runId);
 
   return (
-    <Flex flexDirection="column" pl={3} height="100%">
+    <Flex flexDirection="column" height="100%">
       <Flex
         alignItems="center"
         justifyContent="space-between"
diff --git a/airflow/www/static/js/dag/grid/dagRuns/index.tsx b/airflow/www/static/js/dag/grid/dagRuns/index.tsx
index ae9c204d1b..358b10fb38 100644
--- a/airflow/www/static/js/dag/grid/dagRuns/index.tsx
+++ b/airflow/www/static/js/dag/grid/dagRuns/index.tsx
@@ -147,11 +147,21 @@ const DagRuns = ({
           </Flex>
         </Th>
       )}
-      <Th align="right" verticalAlign="bottom">
+      <Th
+        align="right"
+        verticalAlign="bottom"
+        borderRightWidth="16px"
+        borderRightColor="white"
+      >
         <Flex
           justifyContent="flex-end"
           borderBottomWidth={3}
           position="relative"
+          borderRightWidth="16px"
+          borderRightColor="white"
+          marginRight="-16px"
+          borderTopWidth="50px"
+          borderTopColor="white"
         >
           {runs.map((run: RunWithDuration, index) => (
             <DagRunBar
diff --git a/airflow/www/static/js/dag/grid/index.tsx b/airflow/www/static/js/dag/grid/index.tsx
index 57f749a048..b7299e5ba3 100644
--- a/airflow/www/static/js/dag/grid/index.tsx
+++ b/airflow/www/static/js/dag/grid/index.tsx
@@ -165,11 +165,10 @@ const Grid = ({
         ref={gridScrollRef}
         overflow="auto"
         position="relative"
-        pr={4}
         mt={8}
         overscrollBehavior="auto"
       >
-        <Table pr="10px" borderRightWidth="14px" borderColor="transparent">
+        <Table borderRightWidth="16px" borderColor="transparent">
           <Thead>
             <DagRuns
               groups={groups}
diff --git a/airflow/www/static/js/dag/grid/renderTaskRows.tsx b/airflow/www/static/js/dag/grid/renderTaskRows.tsx
index 181186b172..ab33c0c145 100644
--- a/airflow/www/static/js/dag/grid/renderTaskRows.tsx
+++ b/airflow/www/static/js/dag/grid/renderTaskRows.tsx
@@ -146,6 +146,8 @@ const Row = (props: RowProps) => {
         bg={isSelected ? "blue.100" : "inherit"}
         borderBottomWidth={1}
         borderBottomColor={isGroup && isOpen ? "gray.400" : "gray.200"}
+        borderRightWidth="16px"
+        borderRightColor={isSelected ? "blue.100" : "transparent"}
         role="group"
         _hover={!isSelected ? { bg: hoverBlue } : undefined}
         transition="background-color 0.2s"


(airflow) 10/26: fix: disable dryrun autofetch (#36941)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 36cdca55710cd77779f37cc80551e881bf4d1d73
Author: Muhammad Luqman <48...@users.noreply.github.com>
AuthorDate: Tue Jan 23 00:19:07 2024 +0700

    fix: disable dryrun autofetch (#36941)
    
    set useQuery to be enabled after action button is clicked instead of automatically
    
    closes issue #36119
    
    (cherry picked from commit e0f8c6e0ec29a9df6a99a0beccb14874926d7383)
---
 airflow/www/static/js/api/useClearTaskDryRun.ts                      | 5 ++++-
 airflow/www/static/js/api/useMarkTaskDryRun.ts                       | 5 ++++-
 .../static/js/dag/details/taskInstance/taskActions/ClearInstance.tsx | 1 +
 .../js/dag/details/taskInstance/taskActions/MarkInstanceAs.tsx       | 1 +
 4 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/airflow/www/static/js/api/useClearTaskDryRun.ts b/airflow/www/static/js/api/useClearTaskDryRun.ts
index 7f068a001f..33986a9b48 100644
--- a/airflow/www/static/js/api/useClearTaskDryRun.ts
+++ b/airflow/www/static/js/api/useClearTaskDryRun.ts
@@ -39,6 +39,7 @@ const useClearTaskDryRun = ({
   recursive,
   failed,
   mapIndexes = [],
+  enabled = false,
 }: {
   dagId: string;
   runId: string;
@@ -52,6 +53,7 @@ const useClearTaskDryRun = ({
   recursive: boolean;
   failed: boolean;
   mapIndexes?: number[];
+  enabled?: boolean;
 }) =>
   useQuery(
     [
@@ -101,7 +103,8 @@ const useClearTaskDryRun = ({
           },
         }
       );
-    }
+    },
+    { enabled }
   );
 
 export default useClearTaskDryRun;
diff --git a/airflow/www/static/js/api/useMarkTaskDryRun.ts b/airflow/www/static/js/api/useMarkTaskDryRun.ts
index 4a4441819d..5147829941 100644
--- a/airflow/www/static/js/api/useMarkTaskDryRun.ts
+++ b/airflow/www/static/js/api/useMarkTaskDryRun.ts
@@ -36,6 +36,7 @@ const useMarkTaskDryRun = ({
   upstream,
   downstream,
   mapIndexes = [],
+  enabled = false,
 }: {
   dagId: string;
   runId: string;
@@ -47,6 +48,7 @@ const useMarkTaskDryRun = ({
   upstream: boolean;
   downstream: boolean;
   mapIndexes?: number[];
+  enabled?: boolean;
 }) =>
   useQuery(
     [
@@ -84,7 +86,8 @@ const useMarkTaskDryRun = ({
       return axios.get<AxiosResponse, MinimalTaskInstance[]>(confirmUrl, {
         params,
       });
-    }
+    },
+    { enabled }
   );
 
 export default useMarkTaskDryRun;
diff --git a/airflow/www/static/js/dag/details/taskInstance/taskActions/ClearInstance.tsx b/airflow/www/static/js/dag/details/taskInstance/taskActions/ClearInstance.tsx
index d935f99e65..746ba9ca92 100644
--- a/airflow/www/static/js/dag/details/taskInstance/taskActions/ClearInstance.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/taskActions/ClearInstance.tsx
@@ -102,6 +102,7 @@ const ClearModal = ({
       recursive,
       failed,
       mapIndexes,
+      enabled: isOpen,
     });
 
   const { mutateAsync: clearTask, isLoading } = useClearTask({
diff --git a/airflow/www/static/js/dag/details/taskInstance/taskActions/MarkInstanceAs.tsx b/airflow/www/static/js/dag/details/taskInstance/taskActions/MarkInstanceAs.tsx
index 778fb4e3e2..512b15931c 100644
--- a/airflow/www/static/js/dag/details/taskInstance/taskActions/MarkInstanceAs.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/taskActions/MarkInstanceAs.tsx
@@ -107,6 +107,7 @@ const MarkAsModal = ({
       upstream,
       downstream,
       mapIndexes,
+      enabled: isOpen,
     }
   );
 


(airflow) 25/26: Adjust node width based on task name length (#37254)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6a18db922c00618f7db13ac94a45151f2ce01677
Author: Brent Bovenzi <br...@astronomer.io>
AuthorDate: Fri Feb 9 01:12:35 2024 -0500

    Adjust node width based on task name length (#37254)
    
    (cherry picked from commit 9b997638f31fabe9d075a63647e74924f4b5c0c3)
---
 .../www/static/js/dag/{grid => }/TaskName.test.tsx |  13 +--
 airflow/www/static/js/dag/TaskName.tsx             |  89 +++++++++++++++
 .../www/static/js/dag/details/graph/Node.test.tsx  |   1 -
 airflow/www/static/js/dag/details/graph/Node.tsx   | 119 +++++++--------------
 airflow/www/static/js/dag/grid/TaskName.tsx        |  70 ------------
 airflow/www/static/js/dag/grid/index.test.tsx      |  14 +--
 airflow/www/static/js/dag/grid/renderTaskRows.tsx  |  12 ++-
 airflow/www/static/js/utils/graph.ts               |   7 +-
 8 files changed, 149 insertions(+), 176 deletions(-)

diff --git a/airflow/www/static/js/dag/grid/TaskName.test.tsx b/airflow/www/static/js/dag/TaskName.test.tsx
similarity index 83%
rename from airflow/www/static/js/dag/grid/TaskName.test.tsx
rename to airflow/www/static/js/dag/TaskName.test.tsx
index 8002ab78ca..27c7e2419e 100644
--- a/airflow/www/static/js/dag/grid/TaskName.test.tsx
+++ b/airflow/www/static/js/dag/TaskName.test.tsx
@@ -38,13 +38,7 @@ describe("Test TaskName", () => {
 
   test("Displays a mapped task name", () => {
     const { getByText } = render(
-      <TaskName
-        level={0}
-        label="test"
-        id="test"
-        isMapped
-        onToggle={() => {}}
-      />,
+      <TaskName label="test" id="test" isMapped onToggle={() => {}} />,
       { wrapper: ChakraWrapper }
     );
 
@@ -52,12 +46,11 @@ describe("Test TaskName", () => {
   });
 
   test("Displays a group task name", () => {
-    const { getByText, getByTestId } = render(
-      <TaskName level={0} label="test" id="test" isGroup onToggle={() => {}} />,
+    const { getByText } = render(
+      <TaskName label="test" id="test" isGroup onToggle={() => {}} />,
       { wrapper: ChakraWrapper }
     );
 
     expect(getByText("test")).toBeDefined();
-    expect(getByTestId("open-group")).toBeDefined();
   });
 });
diff --git a/airflow/www/static/js/dag/TaskName.tsx b/airflow/www/static/js/dag/TaskName.tsx
new file mode 100644
index 0000000000..cf151d7fd7
--- /dev/null
+++ b/airflow/www/static/js/dag/TaskName.tsx
@@ -0,0 +1,89 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import React, { MouseEventHandler, CSSProperties } from "react";
+import { Text, TextProps, useTheme } from "@chakra-ui/react";
+import { FiChevronUp, FiArrowUpRight, FiArrowDownRight } from "react-icons/fi";
+
+interface Props extends TextProps {
+  isGroup?: boolean;
+  isMapped?: boolean;
+  onToggle: MouseEventHandler<HTMLDivElement>;
+  isOpen?: boolean;
+  label: string;
+  id?: string;
+  setupTeardownType?: string;
+  isZoomedOut?: boolean;
+}
+
+const TaskName = ({
+  isGroup = false,
+  isMapped = false,
+  onToggle,
+  isOpen = false,
+  label,
+  id,
+  setupTeardownType,
+  isZoomedOut,
+  ...rest
+}: Props) => {
+  const { colors } = useTheme();
+  const iconStyle: CSSProperties = {
+    display: "inline",
+    position: "relative",
+    verticalAlign: "middle",
+  };
+  return (
+    <Text
+      cursor={isGroup ? "pointer" : undefined}
+      onClick={onToggle}
+      data-testid={id}
+      width="100%"
+      color={colors.gray[800]}
+      fontSize={isZoomedOut ? 24 : undefined}
+      textAlign="justify"
+      {...rest}
+    >
+      {label}
+      {isMapped && " [ ]"}
+      {isGroup && (
+        <FiChevronUp
+          size={isZoomedOut ? 24 : 15}
+          strokeWidth={3}
+          style={{
+            transition: "transform 0.5s",
+            transform: `rotate(${isOpen ? 0 : 180}deg)`,
+            ...iconStyle,
+          }}
+        />
+      )}
+      {setupTeardownType === "setup" && (
+        <FiArrowUpRight size={isZoomedOut ? 24 : 15} style={iconStyle} />
+      )}
+      {setupTeardownType === "teardown" && (
+        <FiArrowDownRight size={isZoomedOut ? 24 : 15} style={iconStyle} />
+      )}
+    </Text>
+  );
+};
+
+// Only rerender the component if props change
+const MemoizedTaskName = React.memo(TaskName);
+
+export default MemoizedTaskName;
diff --git a/airflow/www/static/js/dag/details/graph/Node.test.tsx b/airflow/www/static/js/dag/details/graph/Node.test.tsx
index 251f3c6aeb..a01114ec04 100644
--- a/airflow/www/static/js/dag/details/graph/Node.test.tsx
+++ b/airflow/www/static/js/dag/details/graph/Node.test.tsx
@@ -110,7 +110,6 @@ describe("Test Graph Node", () => {
 
     expect(getByText("success")).toBeInTheDocument();
     expect(getByText("task_id")).toBeInTheDocument();
-    expect(getByText("+ 5 tasks")).toBeInTheDocument();
   });
 
   test("Renders normal task correctly", async () => {
diff --git a/airflow/www/static/js/dag/details/graph/Node.tsx b/airflow/www/static/js/dag/details/graph/Node.tsx
index 7be5ae7b88..cbe269f997 100644
--- a/airflow/www/static/js/dag/details/graph/Node.tsx
+++ b/airflow/www/static/js/dag/details/graph/Node.tsx
@@ -18,7 +18,7 @@
  */
 
 import React from "react";
-import { Box, Text, Flex, useTheme } from "@chakra-ui/react";
+import { Box, Text, Flex } from "@chakra-ui/react";
 import { Handle, NodeProps, Position } from "reactflow";
 
 import { SimpleStatus } from "src/dag/StatusBox";
@@ -28,7 +28,7 @@ import { getGroupAndMapSummary, hoverDelay } from "src/utils";
 import Tooltip from "src/components/Tooltip";
 import InstanceTooltip from "src/dag/InstanceTooltip";
 import { useContainerRef } from "src/context/containerRef";
-import { ImArrowUpRight2, ImArrowDownRight2 } from "react-icons/im";
+import TaskName from "src/dag/TaskName";
 
 export interface CustomNodeProps {
   label: string;
@@ -69,7 +69,6 @@ export const BaseNode = ({
     isZoomedOut,
   },
 }: NodeProps<CustomNodeProps>) => {
-  const { colors } = useTheme();
   const { onSelect } = useSelection();
   const containerRef = useContainerRef();
 
@@ -138,84 +137,48 @@ export const BaseNode = ({
             });
           }
         }}
+        px={isZoomedOut ? 1 : 2}
+        mt={isZoomedOut ? -2 : 0}
       >
-        <Flex
-          justifyContent="space-between"
-          width={width}
-          p={2}
-          flexWrap={isZoomedOut ? "nowrap" : "wrap"}
-          flexDirection={isZoomedOut ? "row" : "column"}
-        >
-          <Flex flexDirection="column" width="100%">
-            <Flex
-              justifyContent="space-between"
-              alignItems="center"
-              width="100%"
-              fontSize={isZoomedOut ? 25 : undefined}
-              fontWeight="bold"
-            >
-              <Text noOfLines={1} maxWidth={`calc(${width}px - 8px)`}>
-                {taskName}
+        <TaskName
+          label={taskName}
+          isOpen={isOpen}
+          isGroup={!!childCount}
+          onToggle={(e) => {
+            e.stopPropagation();
+            onToggleCollapse();
+          }}
+          setupTeardownType={setupTeardownType}
+          fontWeight="bold"
+          isZoomedOut={isZoomedOut}
+          mt={isZoomedOut ? -2 : 0}
+          noOfLines={2}
+        />
+        {!isZoomedOut && (
+          <>
+            {!!instance && instance.state && (
+              <Flex alignItems="center">
+                <SimpleStatus state={instance.state} />
+                <Text ml={2} color="gray.500" fontWeight={400} fontSize="md">
+                  {instance.state}
+                </Text>
+              </Flex>
+            )}
+            {task?.operator && (
+              <Text
+                noOfLines={1}
+                maxWidth={`calc(${width}px - 12px)`}
+                fontWeight={400}
+                fontSize="md"
+                width="fit-content"
+                color={operatorTextColor}
+                px={1}
+              >
+                {task.operator}
               </Text>
-              {setupTeardownType === "setup" && (
-                <ImArrowUpRight2 size={15} color={colors.gray[800]} />
-              )}
-              {setupTeardownType === "teardown" && (
-                <ImArrowDownRight2 size={15} color={colors.gray[800]} />
-              )}
-            </Flex>
-            {!isZoomedOut && (
-              <>
-                {!!instance && instance.state && (
-                  <Flex alignItems="center">
-                    <SimpleStatus state={instance.state} />
-                    <Text
-                      ml={2}
-                      color="gray.500"
-                      fontWeight={400}
-                      fontSize="md"
-                    >
-                      {instance.state}
-                    </Text>
-                  </Flex>
-                )}
-                {task?.operator && (
-                  <Text
-                    noOfLines={1}
-                    maxWidth={`calc(${width}px - 12px)`}
-                    fontWeight={400}
-                    fontSize="md"
-                    width="fit-content"
-                    color={operatorTextColor}
-                    px={1}
-                  >
-                    {task.operator}
-                  </Text>
-                )}
-              </>
             )}
-          </Flex>
-          {!!childCount && (
-            <Text
-              noOfLines={1}
-              fontSize={isZoomedOut ? 25 : undefined}
-              color="blue.600"
-              cursor="pointer"
-              width="150px"
-              fontWeight="bold"
-              // Increase the target area to expand/collapse a group
-              p={2}
-              m={-2}
-              onClick={(e) => {
-                e.stopPropagation();
-                onToggleCollapse();
-              }}
-            >
-              {isOpen ? "- " : "+ "}
-              {childCount} tasks
-            </Text>
-          )}
-        </Flex>
+          </>
+        )}
       </Box>
     </Tooltip>
   );
diff --git a/airflow/www/static/js/dag/grid/TaskName.tsx b/airflow/www/static/js/dag/grid/TaskName.tsx
deleted file mode 100644
index ce2de10bf6..0000000000
--- a/airflow/www/static/js/dag/grid/TaskName.tsx
+++ /dev/null
@@ -1,70 +0,0 @@
-/*!
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import React from "react";
-import { Text, Flex } from "@chakra-ui/react";
-import { FiChevronUp, FiChevronDown } from "react-icons/fi";
-
-interface Props {
-  isGroup?: boolean;
-  isMapped?: boolean;
-  onToggle: () => void;
-  isOpen?: boolean;
-  level?: number;
-  label: string;
-  id: string;
-}
-
-const TaskName = ({
-  isGroup = false,
-  isMapped = false,
-  onToggle,
-  isOpen = false,
-  level = 0,
-  label,
-  id,
-}: Props) => (
-  <Flex
-    as={isGroup ? "button" : "div"}
-    onClick={onToggle}
-    aria-label={label}
-    data-testid={id}
-    title={label}
-    mr={4}
-    width="100%"
-    alignItems="center"
-    fontWeight={isGroup || isMapped ? "bold" : "normal"}
-  >
-    <Text display="inline" ml={level * 4 + 4} noOfLines={1}>
-      {label}
-      {isMapped && " [ ]"}
-    </Text>
-    {isGroup &&
-      (isOpen ? (
-        <FiChevronUp data-testid="close-group" />
-      ) : (
-        <FiChevronDown data-testid="open-group" />
-      ))}
-  </Flex>
-);
-
-// Only rerender the component if props change
-const MemoizedTaskName = React.memo(TaskName);
-
-export default MemoizedTaskName;
diff --git a/airflow/www/static/js/dag/grid/index.test.tsx b/airflow/www/static/js/dag/grid/index.test.tsx
index 5cc6e71788..3322e3b1b0 100644
--- a/airflow/www/static/js/dag/grid/index.test.tsx
+++ b/airflow/www/static/js/dag/grid/index.test.tsx
@@ -193,7 +193,7 @@ describe("Test ToggleGroups", () => {
   });
 
   test("Group defaults to closed", () => {
-    const { queryAllByTestId, getByText, getAllByTestId } = render(
+    const { getByText, getAllByTestId } = render(
       <Grid openGroupIds={[]} onToggleGroups={() => {}} />,
       { wrapper: Wrapper }
     );
@@ -204,7 +204,6 @@ describe("Test ToggleGroups", () => {
     expect(getAllByTestId("task-instance")).toHaveLength(2);
     expect(groupName1).toBeInTheDocument();
     expect(groupName2).toBeInTheDocument();
-    expect(queryAllByTestId("open-group")).toHaveLength(2);
   });
 
   test("Buttons are disabled if all groups are expanded or collapsed", () => {
@@ -246,25 +245,18 @@ describe("Test ToggleGroups", () => {
     expect(groupName1).toBeInTheDocument();
     expect(groupName2).toBeInTheDocument();
 
-    expect(queryAllByTestId("close-group")).toHaveLength(4);
-    expect(queryAllByTestId("open-group")).toHaveLength(0);
-
     fireEvent.click(collapseButton);
 
     await waitFor(() =>
       expect(queryAllByTestId("task-instance")).toHaveLength(2)
     );
     expect(queryAllByTestId("close-group")).toHaveLength(0);
-    // Since the groups are nested, only the parent rows is rendered
-    expect(queryAllByTestId("open-group")).toHaveLength(2);
 
     fireEvent.click(expandButton);
 
     await waitFor(() =>
       expect(queryAllByTestId("task-instance")).toHaveLength(6)
     );
-    expect(queryAllByTestId("close-group")).toHaveLength(4);
-    expect(queryAllByTestId("open-group")).toHaveLength(0);
   });
 
   test("Toggling a group does not affect groups with the same label", async () => {
@@ -280,8 +272,6 @@ describe("Test ToggleGroups", () => {
     await waitFor(() =>
       expect(queryAllByTestId("task-instance")).toHaveLength(6)
     );
-    expect(queryAllByTestId("close-group")).toHaveLength(4);
-    expect(queryAllByTestId("open-group")).toHaveLength(0);
 
     const group2Task1 = getByTestId("group_2.task_1");
 
@@ -291,8 +281,6 @@ describe("Test ToggleGroups", () => {
     await waitFor(() =>
       expect(queryAllByTestId("task-instance")).toHaveLength(5)
     );
-    expect(queryAllByTestId("close-group")).toHaveLength(3);
-    expect(queryAllByTestId("open-group")).toHaveLength(1);
   });
 
   test("Hovered effect on task state", async () => {
diff --git a/airflow/www/static/js/dag/grid/renderTaskRows.tsx b/airflow/www/static/js/dag/grid/renderTaskRows.tsx
index ab33c0c145..b32c1b4397 100644
--- a/airflow/www/static/js/dag/grid/renderTaskRows.tsx
+++ b/airflow/www/static/js/dag/grid/renderTaskRows.tsx
@@ -24,7 +24,7 @@ import useSelection, { SelectionProps } from "src/dag/useSelection";
 import type { Task, DagRun } from "src/types";
 
 import StatusBox, { boxSize, boxSizePx } from "../StatusBox";
-import TaskName from "./TaskName";
+import TaskName from "../TaskName";
 
 const boxPadding = 3;
 const boxPaddingPx = `${boxPadding}px`;
@@ -172,7 +172,15 @@ const Row = (props: RowProps) => {
               label={task.label || task.id || ""}
               id={task.id || ""}
               isOpen={isOpen}
-              level={level}
+              ml={level * 4 + 4}
+              setupTeardownType={task.setupTeardownType}
+              mr={4}
+              fontWeight={
+                isGroup || (task.isMapped && !isParentMapped)
+                  ? "bold"
+                  : "normal"
+              }
+              noOfLines={1}
             />
           </Td>
         )}
diff --git a/airflow/www/static/js/utils/graph.ts b/airflow/www/static/js/utils/graph.ts
index 1f77990930..71003b0f19 100644
--- a/airflow/www/static/js/utils/graph.ts
+++ b/airflow/www/static/js/utils/graph.ts
@@ -132,7 +132,7 @@ const generateGraph = ({
         },
         label: value.label,
         layoutOptions: {
-          "elk.padding": "[top=60,left=10,bottom=10,right=10]",
+          "elk.padding": "[top=80,left=15,bottom=15,right=15]",
         },
         children: children.map(formatChildNode),
         edges: filteredEdges
@@ -172,6 +172,8 @@ const generateGraph = ({
         }));
       closedGroupIds.push(id);
     }
+    const extraLabelLength =
+      value.label.length > 20 ? value.label.length - 19 : 0;
     return {
       id,
       label: value.label,
@@ -180,7 +182,8 @@ const generateGraph = ({
         isJoinNode,
         childCount,
       },
-      width: isJoinNode ? 10 : 200,
+      // Make tasks with long names wider
+      width: isJoinNode ? 10 : 200 + extraLabelLength * 5,
       height: isJoinNode ? 10 : 70,
     };
   };


(airflow) 12/26: fix: DagRuns with UPSTREAM_FAILED tasks get stuck in the backfill. (#36954)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2096019798a048170dc66ece345326a849fff8d6
Author: Aleksey Kirilishin <54...@users.noreply.github.com>
AuthorDate: Fri Jan 26 22:41:42 2024 +0300

    fix: DagRuns with UPSTREAM_FAILED tasks get stuck in the backfill. (#36954)
    
    (cherry picked from commit edce53582c7548cdfd6bbe9fd7970c8f4def4155)
---
 airflow/jobs/backfill_job_runner.py                |  8 +++--
 .../test_backfill_with_upstream_failed_task.py     | 36 ++++++++++++++++++++++
 tests/jobs/test_backfill_job.py                    | 30 +++++++++++++++++-
 3 files changed, 70 insertions(+), 4 deletions(-)

diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py
index af8ebd1c4e..101e6cc313 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -106,7 +106,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
         failed: set[TaskInstanceKey] = attr.ib(factory=set)
         not_ready: set[TaskInstanceKey] = attr.ib(factory=set)
         deadlocked: set[TaskInstance] = attr.ib(factory=set)
-        active_runs: list[DagRun] = attr.ib(factory=list)
+        active_runs: set[DagRun] = attr.ib(factory=set)
         executed_dag_run_dates: set[pendulum.DateTime] = attr.ib(factory=set)
         finished_runs: int = 0
         total_runs: int = 0
@@ -526,6 +526,8 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
                             ti_status.running.pop(key)
                         # Reset the failed task in backfill to scheduled state
                         ti.set_state(TaskInstanceState.SCHEDULED, session=session)
+                        if ti.dag_run not in ti_status.active_runs:
+                            ti_status.active_runs.add(ti.dag_run)
                 else:
                     # Default behaviour which works for subdag.
                     if ti.state in (TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED):
@@ -746,7 +748,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
             session.commit()
 
             # update dag run state
-            _dag_runs = ti_status.active_runs[:]
+            _dag_runs = ti_status.active_runs.copy()
             for run in _dag_runs:
                 run.update_state(session=session)
                 if run.state in State.finished_dr_states:
@@ -848,7 +850,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
                 dag_run = self._get_dag_run(dagrun_info, dag, session=session)
                 if dag_run is not None:
                     tis_map = self._task_instances_for_dag_run(dag, dag_run, session=session)
-                    ti_status.active_runs.append(dag_run)
+                    ti_status.active_runs.add(dag_run)
                     ti_status.to_run.update(tis_map or {})
 
         processed_dag_run_dates = self._process_backfill_task_instances(
diff --git a/tests/dags/test_backfill_with_upstream_failed_task.py b/tests/dags/test_backfill_with_upstream_failed_task.py
new file mode 100644
index 0000000000..d2cb6353bf
--- /dev/null
+++ b/tests/dags/test_backfill_with_upstream_failed_task.py
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime
+
+from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
+
+dag = DAG(
+    dag_id="test_backfill_with_upstream_failed_task",
+    default_args={"retries": 0, "start_date": datetime.datetime(2010, 1, 1)},
+    schedule="0 0 * * *",
+)
+
+failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", dag=dag)
+downstream_task = BashOperator(task_id="downstream_task", bash_command="echo 1", dag=dag)
+downstream_task.set_upstream(failing_task)
+
+if __name__ == "__main__":
+    dag.cli()
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 88ce758b57..0802f11aa9 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -1916,7 +1916,7 @@ class TestBackfillJob:
         executor = MockExecutor()
 
         ti_status = BackfillJobRunner._DagRunTaskStatus()
-        ti_status.active_runs.append(dr)
+        ti_status.active_runs.add(dr)
         ti_status.to_run = {ti.key: ti for ti in dr.task_instances}
 
         job = Job(executor=executor)
@@ -2103,3 +2103,31 @@ class TestBackfillJob:
         assert dag_run.state == DagRunState.FAILED
 
         dag.clear()
+
+    def test_backfill_failed_dag_with_upstream_failed_task(self, dag_maker):
+        self.dagbag.process_file(str(TEST_DAGS_FOLDER / "test_backfill_with_upstream_failed_task.py"))
+        dag = self.dagbag.get_dag("test_backfill_with_upstream_failed_task")
+
+        # We have to use the "fake" version of perform_heartbeat due to the 'is_unit_test' check in
+        # the original one. However, instead of using the original version of perform_heartbeat,
+        # we can simply wait for a LocalExecutor's worker cycle. The approach with sleep works well now,
+        # but it can be replaced with checking the state of the LocalTaskJob.
+        def fake_perform_heartbeat(*args, **kwargs):
+            import time
+
+            time.sleep(1)
+
+        with mock.patch("airflow.jobs.backfill_job_runner.perform_heartbeat", fake_perform_heartbeat):
+            job = Job(executor=ExecutorLoader.load_executor("LocalExecutor"))
+            job_runner = BackfillJobRunner(
+                job=job,
+                dag=dag,
+                start_date=DEFAULT_DATE,
+                end_date=DEFAULT_DATE,
+                rerun_failed_tasks=True,
+            )
+            with pytest.raises(BackfillUnfinished):
+                run_job(job=job, execute_callable=job_runner._execute)
+
+        dr: DagRun = dag.get_last_dagrun()
+        assert dr.state == State.FAILED


(airflow) 24/26: fix: PythonVirtualenvOperator crashes if any python_callable function is defined in the same source as DAG (#37165)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f841e70fc5c36535491afe55c22675ec24784e75
Author: Kalyan <ka...@live.com>
AuthorDate: Wed Feb 7 20:46:09 2024 +0530

    fix: PythonVirtualenvOperator crashes if any python_callable function is defined in the same source as DAG (#37165)
    
    ---------
    
    Signed-off-by: kalyanr <ka...@live.com>
    (cherry picked from commit e75522b0636fb5115d73da43da244d0c3832794f)
---
 airflow/models/dagbag.py                      | 12 +++++++-----
 airflow/operators/python.py                   | 23 +++++++++++++++--------
 airflow/utils/file.py                         | 12 ++++++++++++
 airflow/utils/python_virtualenv_script.jinja2 | 19 +++++++++++++++++--
 4 files changed, 51 insertions(+), 15 deletions(-)

diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index ca81af8105..ce9bf5587b 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -17,7 +17,6 @@
 # under the License.
 from __future__ import annotations
 
-import hashlib
 import importlib
 import importlib.machinery
 import importlib.util
@@ -48,7 +47,12 @@ from airflow.stats import Stats
 from airflow.utils import timezone
 from airflow.utils.dag_cycle_tester import check_cycle
 from airflow.utils.docs import get_docs_url
-from airflow.utils.file import correct_maybe_zipped, list_py_file_paths, might_contain_dag
+from airflow.utils.file import (
+    correct_maybe_zipped,
+    get_unique_dag_module_name,
+    list_py_file_paths,
+    might_contain_dag,
+)
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.retries import MAX_DB_RETRIES, run_with_db_retries
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -326,9 +330,7 @@ class DagBag(LoggingMixin):
             return []
 
         self.log.debug("Importing %s", filepath)
-        path_hash = hashlib.sha1(filepath.encode("utf-8")).hexdigest()
-        org_mod_name = Path(filepath).stem
-        mod_name = f"unusual_prefix_{path_hash}_{org_mod_name}"
+        mod_name = get_unique_dag_module_name(filepath)
 
         if mod_name in sys.modules:
             del sys.modules[mod_name]
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 1c5c9d3f69..1b1453cc5e 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -52,6 +52,7 @@ from airflow.models.variable import Variable
 from airflow.operators.branch import BranchMixIn
 from airflow.utils import hashlib_wrapper
 from airflow.utils.context import context_copy_partial, context_merge
+from airflow.utils.file import get_unique_dag_module_name
 from airflow.utils.operator_helpers import KeywordParameters
 from airflow.utils.process_utils import execute_in_subprocess
 from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script
@@ -437,15 +438,21 @@ class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta):
 
             self._write_args(input_path)
             self._write_string_args(string_args_path)
+
+            jinja_context = {
+                "op_args": self.op_args,
+                "op_kwargs": op_kwargs,
+                "expect_airflow": self.expect_airflow,
+                "pickling_library": self.pickling_library.__name__,
+                "python_callable": self.python_callable.__name__,
+                "python_callable_source": self.get_python_source(),
+            }
+
+            if inspect.getfile(self.python_callable) == self.dag.fileloc:
+                jinja_context["modified_dag_module_name"] = get_unique_dag_module_name(self.dag.fileloc)
+
             write_python_script(
-                jinja_context={
-                    "op_args": self.op_args,
-                    "op_kwargs": op_kwargs,
-                    "expect_airflow": self.expect_airflow,
-                    "pickling_library": self.pickling_library.__name__,
-                    "python_callable": self.python_callable.__name__,
-                    "python_callable_source": self.get_python_source(),
-                },
+                jinja_context=jinja_context,
                 filename=os.fspath(script_path),
                 render_template_as_native_obj=self.dag.render_template_as_native_obj,
             )
diff --git a/airflow/utils/file.py b/airflow/utils/file.py
index 013d9ea36a..7e15eeb2f8 100644
--- a/airflow/utils/file.py
+++ b/airflow/utils/file.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import ast
+import hashlib
 import logging
 import os
 import zipfile
@@ -33,6 +34,8 @@ from airflow.exceptions import RemovedInAirflow3Warning
 
 log = logging.getLogger(__name__)
 
+MODIFIED_DAG_MODULE_NAME = "unusual_prefix_{path_hash}_{module_name}"
+
 
 class _IgnoreRule(Protocol):
     """Interface for ignore rules for structural subtyping."""
@@ -379,3 +382,12 @@ def iter_airflow_imports(file_path: str) -> Generator[str, None, None]:
     for m in _find_imported_modules(parsed):
         if m.startswith("airflow."):
             yield m
+
+
+def get_unique_dag_module_name(file_path: str) -> str:
+    """Returns a unique module name in the format unusual_prefix_{sha1 of module's file path}_{original module name}."""
+    if isinstance(file_path, str):
+        path_hash = hashlib.sha1(file_path.encode("utf-8")).hexdigest()
+        org_mod_name = Path(file_path).stem
+        return MODIFIED_DAG_MODULE_NAME.format(path_hash=path_hash, module_name=org_mod_name)
+    raise ValueError("file_path should be a string to generate unique module name")
diff --git a/airflow/utils/python_virtualenv_script.jinja2 b/airflow/utils/python_virtualenv_script.jinja2
index 7bbf6a9531..4199a47130 100644
--- a/airflow/utils/python_virtualenv_script.jinja2
+++ b/airflow/utils/python_virtualenv_script.jinja2
@@ -34,6 +34,22 @@ if sys.version_info >= (3,6):
         pass
 {% endif %}
 
+# Script
+{{ python_callable_source }}
+
+# monkey patching for the cases when python_callable is part of the dag module.
+{% if modified_dag_module_name is defined %}
+
+import types
+
+{{ modified_dag_module_name }}  = types.ModuleType("{{ modified_dag_module_name }}")
+
+{{ modified_dag_module_name }}.{{ python_callable }} = {{ python_callable }}
+
+sys.modules["{{modified_dag_module_name}}"] = {{modified_dag_module_name}}
+
+{% endif%}
+
 {% if op_args or op_kwargs %}
 with open(sys.argv[1], "rb") as file:
     arg_dict = {{ pickling_library }}.load(file)
@@ -47,8 +63,7 @@ with open(sys.argv[3], "r") as file:
     virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))
 {% endif %}
 
-# Script
-{{ python_callable_source }}
+
 try:
     res = {{ python_callable }}(*arg_dict["args"], **arg_dict["kwargs"])
 except Exception as e:


(airflow) 26/26: Fix the bug that affected the DAG end date. (#36144)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit be858a50cf784df88b14011b998d3aa5a4325340
Author: Aleksey Kirilishin <54...@users.noreply.github.com>
AuthorDate: Fri Feb 9 15:53:04 2024 +0300

    Fix the bug that affected the DAG end date. (#36144)
    
    (cherry picked from commit 9f4f208b5da38bc2e82db682c636ec4fcf7ad617)
---
 airflow/api/common/mark_tasks.py                   |  5 --
 airflow/models/dagrun.py                           | 66 +++++++++++++++++++++-
 .../endpoints/test_dag_run_endpoint.py             |  4 +-
 tests/api_experimental/client/test_local_client.py |  4 +-
 tests/api_experimental/common/test_mark_tasks.py   | 49 +++++++++++-----
 tests/models/test_cleartasks.py                    |  5 +-
 6 files changed, 107 insertions(+), 26 deletions(-)

diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py
index 3cc6dfdfd7..a175a61e20 100644
--- a/airflow/api/common/mark_tasks.py
+++ b/airflow/api/common/mark_tasks.py
@@ -366,11 +366,6 @@ def _set_dag_run_state(dag_id: str, run_id: str, state: DagRunState, session: SA
         select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == run_id)
     ).scalar_one()
     dag_run.state = state
-    if state == DagRunState.RUNNING:
-        dag_run.start_date = timezone.utcnow()
-        dag_run.end_date = None
-    else:
-        dag_run.end_date = timezone.utcnow()
     session.merge(dag_run)
 
 
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index c7f48e1692..59cd7e58b7 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -272,11 +272,75 @@ class DagRun(Base, LoggingMixin):
         return self._state
 
     def set_state(self, state: DagRunState) -> None:
+        """Change the state of the DagRan.
+
+        Changes to attributes are implemented in accordance with the following table
+        (rows represent old states, columns represent new states):
+
+        .. list-table:: State transition matrix
+           :header-rows: 1
+           :stub-columns: 1
+
+           * -
+             - QUEUED
+             - RUNNING
+             - SUCCESS
+             - FAILED
+           * - None
+             - queued_at = timezone.utcnow()
+             - if empty: start_date = timezone.utcnow()
+               end_date = None
+             - end_date = timezone.utcnow()
+             - end_date = timezone.utcnow()
+           * - QUEUED
+             - queued_at = timezone.utcnow()
+             - if empty: start_date = timezone.utcnow()
+               end_date = None
+             - end_date = timezone.utcnow()
+             - end_date = timezone.utcnow()
+           * - RUNNING
+             - queued_at = timezone.utcnow()
+               start_date = None
+               end_date = None
+             -
+             - end_date = timezone.utcnow()
+             - end_date = timezone.utcnow()
+           * - SUCCESS
+             - queued_at = timezone.utcnow()
+               start_date = None
+               end_date = None
+             - start_date = timezone.utcnow()
+               end_date = None
+             -
+             -
+           * - FAILED
+             - queued_at = timezone.utcnow()
+               start_date = None
+               end_date = None
+             - start_date = timezone.utcnow()
+               end_date = None
+             -
+             -
+
+        """
         if state not in State.dag_states:
             raise ValueError(f"invalid DagRun state: {state}")
         if self._state != state:
+            if state == DagRunState.QUEUED:
+                self.queued_at = timezone.utcnow()
+                self.start_date = None
+                self.end_date = None
+            if state == DagRunState.RUNNING:
+                if self._state in State.finished_dr_states:
+                    self.start_date = timezone.utcnow()
+                else:
+                    self.start_date = self.start_date or timezone.utcnow()
+                self.end_date = None
+            if self._state in State.unfinished_dr_states or self._state is None:
+                if state in State.finished_dr_states:
+                    self.end_date = timezone.utcnow()
             self._state = state
-            self.end_date = timezone.utcnow() if self._state in State.finished_dr_states else None
+        else:
             if state == DagRunState.QUEUED:
                 self.queued_at = timezone.utcnow()
 
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 2c4c393dd3..face8f7d75 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -1442,11 +1442,11 @@ class TestPatchDagRunState(TestDagRunEndpoint):
             "conf": {},
             "dag_id": dag_id,
             "dag_run_id": dag_run_id,
-            "end_date": dr.end_date.isoformat(),
+            "end_date": dr.end_date.isoformat() if state != State.QUEUED else None,
             "execution_date": dr.execution_date.isoformat(),
             "external_trigger": False,
             "logical_date": dr.execution_date.isoformat(),
-            "start_date": dr.start_date.isoformat(),
+            "start_date": dr.start_date.isoformat() if state != State.QUEUED else None,
             "state": state,
             "data_interval_start": dr.data_interval_start.isoformat(),
             "data_interval_end": dr.data_interval_end.isoformat(),
diff --git a/tests/api_experimental/client/test_local_client.py b/tests/api_experimental/client/test_local_client.py
index b02a5a5c42..91a81a0caf 100644
--- a/tests/api_experimental/client/test_local_client.py
+++ b/tests/api_experimental/client/test_local_client.py
@@ -135,13 +135,11 @@ class TestLocalClient:
 
             # test output
             queued_at = pendulum.now()
-            started_at = pendulum.now()
             mock.return_value = DagRun(
                 dag_id=test_dag_id,
                 run_id=run_id,
                 queued_at=queued_at,
                 execution_date=EXECDATE,
-                start_date=started_at,
                 external_trigger=True,
                 state=DagRunState.QUEUED,
                 conf={},
@@ -159,7 +157,7 @@ class TestLocalClient:
                 "last_scheduling_decision": None,
                 "logical_date": EXECDATE,
                 "run_type": DagRunType.MANUAL,
-                "start_date": started_at,
+                "start_date": None,
                 "state": DagRunState.QUEUED,
             }
             dag_run = self.client.trigger_dag(dag_id=test_dag_id)
diff --git a/tests/api_experimental/common/test_mark_tasks.py b/tests/api_experimental/common/test_mark_tasks.py
index 47c10fa185..9b28136bba 100644
--- a/tests/api_experimental/common/test_mark_tasks.py
+++ b/tests/api_experimental/common/test_mark_tasks.py
@@ -555,20 +555,28 @@ class TestMarkDAGRun:
         assert dr.get_state() == state
 
     @provide_session
-    def _verify_dag_run_dates(self, dag, date, state, middle_time, session=None):
+    def _verify_dag_run_dates(self, dag, date, state, middle_time=None, old_end_date=None, session=None):
         # When target state is RUNNING, we should set start_date,
         # otherwise we should set end_date.
         DR = DagRun
         dr = session.query(DR).filter(DR.dag_id == dag.dag_id, DR.execution_date == date).one()
         if state == State.RUNNING:
             # Since the DAG is running, the start_date must be updated after creation
-            assert dr.start_date > middle_time
+            if middle_time:
+                assert dr.start_date > middle_time
             # If the dag is still running, we don't have an end date
             assert dr.end_date is None
         else:
-            # If the dag is not running, there must be an end time
-            assert dr.start_date < middle_time
-            assert dr.end_date > middle_time
+            # If the dag is not running, there must be an end time,
+            # and the end time must not be changed if it has already been set.
+            if dr.start_date and middle_time:
+                assert dr.start_date < middle_time
+            if dr.end_date:
+                if old_end_date:
+                    assert dr.end_date == old_end_date
+                else:
+                    if middle_time:
+                        assert dr.end_date > middle_time
 
     def test_set_running_dag_run_to_success(self):
         date = self.execution_dates[0]
@@ -599,30 +607,42 @@ class TestMarkDAGRun:
         assert dr.get_task_instance("run_after_loop").state == State.FAILED
         self._verify_dag_run_dates(self.dag1, date, State.FAILED, middle_time)
 
-    @pytest.mark.parametrize(
-        "dag_run_alter_function, new_state",
-        [(set_dag_run_state_to_running, State.RUNNING), (set_dag_run_state_to_queued, State.QUEUED)],
-    )
-    def test_set_running_dag_run_to_activate_state(self, dag_run_alter_function: Callable, new_state: State):
+    def test_set_running_dag_run_to_running_state(self):
+        date = self.execution_dates[0]  # type: ignore
+        dr = self._create_test_dag_run(State.RUNNING, date)
+        self._set_default_task_instance_states(dr)
+
+        altered = set_dag_run_state_to_running(dag=self.dag1, run_id=dr.run_id, commit=True)  # type: ignore
+
+        # None of the tasks should be altered, only the dag itself
+        assert len(altered) == 0
+        new_state = State.RUNNING
+        self._verify_dag_run_state(self.dag1, date, new_state)  # type: ignore
+        self._verify_task_instance_states_remain_default(dr)
+        self._verify_dag_run_dates(self.dag1, date, new_state)  # type: ignore
+
+    def test_set_running_dag_run_to_queued_state(self):
         date = self.execution_dates[0]  # type: ignore
         dr = self._create_test_dag_run(State.RUNNING, date)
         middle_time = timezone.utcnow()
         self._set_default_task_instance_states(dr)
 
-        altered = dag_run_alter_function(dag=self.dag1, run_id=dr.run_id, commit=True)  # type: ignore
+        altered = set_dag_run_state_to_queued(dag=self.dag1, run_id=dr.run_id, commit=True)  # type: ignore
 
         # None of the tasks should be altered, only the dag itself
         assert len(altered) == 0
+        new_state = State.QUEUED
         self._verify_dag_run_state(self.dag1, date, new_state)  # type: ignore
         self._verify_task_instance_states_remain_default(dr)
         self._verify_dag_run_dates(self.dag1, date, new_state, middle_time)  # type: ignore
 
     @pytest.mark.parametrize("completed_state", [State.SUCCESS, State.FAILED])
-    def test_set_success_dag_run_to_success(self, completed_state):
+    def test_set_completed_dag_run_to_success(self, completed_state):
         date = self.execution_dates[0]
         dr = self._create_test_dag_run(completed_state, date)
         middle_time = timezone.utcnow()
         self._set_default_task_instance_states(dr)
+        old_end_date = dr.end_date
 
         altered = set_dag_run_state_to_success(dag=self.dag1, run_id=dr.run_id, commit=True)
 
@@ -631,13 +651,14 @@ class TestMarkDAGRun:
         assert len(altered) == expected
         self._verify_dag_run_state(self.dag1, date, State.SUCCESS)
         self._verify_task_instance_states(self.dag1, date, State.SUCCESS)
-        self._verify_dag_run_dates(self.dag1, date, State.SUCCESS, middle_time)
+        self._verify_dag_run_dates(self.dag1, date, State.SUCCESS, middle_time, old_end_date)
 
     @pytest.mark.parametrize("completed_state", [State.SUCCESS, State.FAILED])
     def test_set_completed_dag_run_to_failed(self, completed_state):
         date = self.execution_dates[0]
         dr = self._create_test_dag_run(completed_state, date)
         middle_time = timezone.utcnow()
+        old_end_date = dr.end_date
         self._set_default_task_instance_states(dr)
 
         altered = set_dag_run_state_to_failed(dag=self.dag1, run_id=dr.run_id, commit=True)
@@ -646,7 +667,7 @@ class TestMarkDAGRun:
         assert len(altered) == expected
         self._verify_dag_run_state(self.dag1, date, State.FAILED)
         assert dr.get_task_instance("run_after_loop").state == State.FAILED
-        self._verify_dag_run_dates(self.dag1, date, State.FAILED, middle_time)
+        self._verify_dag_run_dates(self.dag1, date, State.FAILED, middle_time, old_end_date)
 
     @pytest.mark.parametrize(
         "dag_run_alter_function,new_state",
diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index ed0232926a..bce9dc4668 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -210,7 +210,10 @@ class TestClearTasks:
         session.refresh(dr)
 
         assert dr.state == state
-        assert dr.start_date
+        if state == DagRunState.QUEUED:
+            assert dr.start_date is None
+        if state == DagRunState.RUNNING:
+            assert dr.start_date
         assert dr.last_scheduling_decision == DEFAULT_DATE
 
     @pytest.mark.parametrize(


(airflow) 07/26: Link to release notes in the upgrade docs (#36923)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 61367fc69f876ab3557525b23edbe07141a4201a
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Fri Jan 19 16:58:23 2024 -0700

    Link to release notes in the upgrade docs (#36923)
    
    (cherry picked from commit cad8ab31bff542f0a02727e7bafe87b275feceb2)
---
 docs/apache-airflow/installation/upgrading.rst | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/docs/apache-airflow/installation/upgrading.rst b/docs/apache-airflow/installation/upgrading.rst
index 6328e887b0..8fc99d7475 100644
--- a/docs/apache-airflow/installation/upgrading.rst
+++ b/docs/apache-airflow/installation/upgrading.rst
@@ -25,6 +25,11 @@ Newer Airflow versions can contain database migrations so you must run ``airflow
 to migrate your database with the schema changes in the Airflow version you are upgrading to.
 Don't worry, it's safe to run even if there are no migrations to perform.
 
+What are the changes between Airflow version x and y?
+=====================================================
+
+The :doc:`release notes <../release_notes>` lists the changes that were included in any given Airflow release.
+
 Upgrade preparation - make a backup of DB
 =========================================
 


(airflow) 13/26: add description to queue_when (#36997)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 198dd64f2a7d2e73ffd6c7ea1a1263092546b1f7
Author: Gerson Scheffer <54...@users.noreply.github.com>
AuthorDate: Sat Jan 27 12:35:14 2024 -0300

    add description to queue_when (#36997)
    
    (cherry picked from commit 1bb8126ddc00edb8e0cb3076977ab396dab3f5c1)
---
 airflow/api_connexion/openapi/v1.yaml | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index 0c34c44876..01cb7ca067 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -3397,6 +3397,8 @@ components:
         queued_when:
           type: string
           nullable: true
+          description: |
+            The datetime that the task enter the state QUEUE, also known as queue_at
         pid:
           type: integer
           nullable: true


(airflow) 04/26: Fix webserver always redirecting to home page if user was not logged in (#36833)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d3b027d9141ae80620f3dde47366a7bb14d29392
Author: Carlos Sánchez Páez <ca...@feverup.com>
AuthorDate: Wed Jan 17 18:09:13 2024 +0100

    Fix webserver always redirecting to home page if user was not logged in (#36833)
    
    (cherry picked from commit 3266e94a4da649f863cb01461adb5ac26db1722c)
---
 airflow/www/auth.py                             | 4 ++--
 tests/www/views/test_anonymous_as_admin_role.py | 4 +++-
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/airflow/www/auth.py b/airflow/www/auth.py
index 97a79cc533..db3d8f2ab3 100644
--- a/airflow/www/auth.py
+++ b/airflow/www/auth.py
@@ -116,7 +116,7 @@ def has_access_with_pk(f):
         else:
             log.warning(LOGMSG_ERR_SEC_ACCESS_DENIED, permission_str, self.__class__.__name__)
             flash(as_unicode(FLAMSG_ERR_SEC_ACCESS_DENIED), "danger")
-        return redirect(get_auth_manager().get_url_login(next=request.url))
+        return redirect(get_auth_manager().get_url_login(next_url=request.url))
 
     f._permission_name = permission_str
     return functools.update_wrapper(wraps, f)
@@ -173,7 +173,7 @@ def _has_access(*, is_authorized: bool, func: Callable, args, kwargs):
             403,
         )
     elif not get_auth_manager().is_logged_in():
-        return redirect(get_auth_manager().get_url_login(next=request.url))
+        return redirect(get_auth_manager().get_url_login(next_url=request.url))
     else:
         access_denied = get_access_denied_message()
         flash(access_denied, "danger")
diff --git a/tests/www/views/test_anonymous_as_admin_role.py b/tests/www/views/test_anonymous_as_admin_role.py
index 214f2ee3c9..a78485ea97 100644
--- a/tests/www/views/test_anonymous_as_admin_role.py
+++ b/tests/www/views/test_anonymous_as_admin_role.py
@@ -17,6 +17,8 @@
 # under the License.
 from __future__ import annotations
 
+from urllib.parse import quote_plus
+
 import pytest
 
 from airflow.models import Pool
@@ -54,7 +56,7 @@ def test_delete_pool_anonymous_user_no_role(anonymous_client, pool_factory):
     pool = pool_factory()
     resp = anonymous_client.post(f"pool/delete/{pool.id}")
     assert 302 == resp.status_code
-    assert "/login/" == resp.headers["Location"]
+    assert f"/login/?next={quote_plus(f'http://localhost/pool/delete/{pool.id}')}" == resp.headers["Location"]
 
 
 def test_delete_pool_anonymous_user_as_admin(anonymous_client_as_admin, pool_factory):


(airflow) 05/26: Fix bug introduced by replacing spaces by + in run_id (#36877)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 83d630c7fb77e0c0989a85b0d45daa4d06410c99
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Jan 18 23:53:14 2024 +0100

    Fix bug introduced by replacing spaces by + in run_id (#36877)
    
    While working on sanitizing the run_id(#32293), I replaced spaces with +
    which is not an ideal way to quote a URL. This led to issues when users
    use spaces in their DAG run_id.
    This commit fixes the issue by using urllib.parse.quote to properly quote
    the URL.
    
    (cherry picked from commit ec8fab5d2a0c1c30aa4f45f6d1a54d2610a6594e)
---
 airflow/www/views.py                      | 2 +-
 tests/www/views/test_views_trigger_dag.py | 2 ++
 2 files changed, 3 insertions(+), 1 deletion(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index 74ff10bed2..f2ea54e81a 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1944,7 +1944,7 @@ class Airflow(AirflowBaseView):
     @provide_session
     def trigger(self, dag_id: str, session: Session = NEW_SESSION):
         """Triggers DAG Run."""
-        run_id = request.values.get("run_id", "").replace(" ", "+")
+        run_id = request.values.get("run_id", "")
         origin = get_safe_url(request.values.get("origin"))
         unpause = request.values.get("unpause")
         request_conf = request.values.get("conf")
diff --git a/tests/www/views/test_views_trigger_dag.py b/tests/www/views/test_views_trigger_dag.py
index 6471c092bd..7e89d29f5b 100644
--- a/tests/www/views/test_views_trigger_dag.py
+++ b/tests/www/views/test_views_trigger_dag.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 
 import datetime
 import json
+from urllib.parse import quote
 
 import pytest
 
@@ -369,6 +370,7 @@ def test_trigger_dag_params_array_value_none_render(admin_client, dag_maker, ses
 def test_dag_run_id_pattern(session, admin_client, pattern, run_id, result):
     with conf_vars({("scheduler", "allowed_run_id_pattern"): pattern}):
         test_dag_id = "example_bash_operator"
+        run_id = quote(run_id)
         admin_client.post(f"dags/{test_dag_id}/trigger?run_id={run_id}", data={"conf": "{}"})
         run = session.query(DagRun).filter(DagRun.dag_id == test_dag_id).first()
         if result:


(airflow) 11/26: Updated config.yml for environment variable sql_alchemy_connect_args (#36526)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 29930bea0932abe428e606e24733a91f344862ce
Author: koushik-rout-samsung <14...@users.noreply.github.com>
AuthorDate: Thu Jan 25 18:00:03 2024 +0000

    Updated config.yml for environment variable sql_alchemy_connect_args  (#36526)
    
    * Update config.yml
    
    * Updated config.yml
    
    added more description
    
    * conf update
    
    * lint check update
    
    * Update config.yml
    
    extra space removed
    
    * Update config.yml
    
    change for pytest
    
    (cherry picked from commit 8fd9fdb812b71e50dc120d65a5cd0b43bcfcef13)
---
 airflow/config_templates/config.yml | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 407d0cb05a..4107a49895 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -583,11 +583,15 @@ database:
       description: |
         Import path for connect args in SqlAlchemy. Defaults to an empty dict.
         This is useful when you want to configure db engine args that SqlAlchemy won't parse
-        in connection string.
-        See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine.params.connect_args
+        in connection string. This can be set by passing a dictionary containing the create engine parameters.
+        For more details about passing create engine parameters(keepalives variables, timeout etc)
+        in postgres db connection see
+        https://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#setting-up-a-postgresql-database
+        e.g connect_args={"timeout":30} can be defined in airflow_local_settings.py and
+        can be imported as shown below
       version_added: 2.3.0
       type: string
-      example: '{"timeout": 30}'
+      example: 'airflow_local_settings.connect_args'
       default: ~
     load_default_connections:
       description: |


(airflow) 17/26: Displaying "actual" try number in TaskInstance view (#34635)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0fc68e0ffd4c97645499c97c5ce9a8138bcccd77
Author: Sam Wheating <sa...@gmail.com>
AuthorDate: Wed Jan 31 06:47:35 2024 -0800

    Displaying "actual" try number in TaskInstance view (#34635)
    
    (cherry picked from commit ce6ac4176445efbd6097cdcdb10782752a4f1bf9)
---
 airflow/models/taskinstance.py | 12 ++++++++++++
 airflow/www/views.py           |  6 ++----
 2 files changed, 14 insertions(+), 4 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index b653964580..cbbc2b726c 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1421,6 +1421,18 @@ class TaskInstance(Base, LoggingMixin):
         """
         return _get_try_number(task_instance=self)
 
+    @try_number.expression
+    def try_number(cls):
+        """
+        This is what will be used by SQLAlchemy when filtering on try_number.
+
+        This is required because the override in the get_try_number function causes
+        try_number values to be off by one when listing tasks in the UI.
+
+        :meta private:
+        """
+        return cls._try_number
+
     @try_number.setter
     def try_number(self, value: int) -> None:
         """
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 4bf06d35c7..5252d20ce3 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -5483,7 +5483,7 @@ class TaskInstanceModelView(AirflowModelView):
         "priority_weight",
         "queue",
         "queued_dttm",
-        "try_number",
+        "prev_attempted_tries",
         "pool",
         "queued_by_job_id",
         "external_executor_id",
@@ -5512,9 +5512,7 @@ class TaskInstanceModelView(AirflowModelView):
         "queued_by_job_id",
     ]
 
-    label_columns = {
-        "dag_run.execution_date": "Logical Date",
-    }
+    label_columns = {"dag_run.execution_date": "Logical Date", "prev_attempted_tries": "Try Number"}
 
     search_columns = [
         "state",