You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/10/28 19:26:25 UTC

(airflow) branch main updated: Allow optional defaults in required fields with manual triggered dags (#31301)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3d7c8f7d05 Allow optional defaults in required fields with manual triggered dags (#31301)
3d7c8f7d05 is described below

commit 3d7c8f7d058284a4eb7e5e42eda56b3c284b7866
Author: Jens Scheffler <95...@users.noreply.github.com>
AuthorDate: Sat Oct 28 21:26:18 2023 +0200

    Allow optional defaults in required fields with manual triggered dags (#31301)
---
 airflow/example_dags/example_params_ui_tutorial.py | 24 +++++++--------------
 airflow/models/dag.py                              |  2 +-
 docs/apache-airflow/core-concepts/params.rst       | 11 +++++++++-
 tests/dags/test_invalid_param.py                   |  2 +-
 ...est_invalid_param.py => test_invalid_param2.py} | 10 ++++-----
 ...est_invalid_param.py => test_invalid_param3.py} | 12 +++++------
 ...est_invalid_param.py => test_invalid_param4.py} | 10 ++++-----
 .../{test_invalid_param.py => test_valid_param.py} | 12 +++++++----
 ...{test_invalid_param.py => test_valid_param2.py} | 12 ++++++-----
 tests/jobs/test_scheduler_job.py                   |  3 +++
 tests/models/test_dagbag.py                        | 25 ++++++++++++++++++++--
 11 files changed, 77 insertions(+), 46 deletions(-)

diff --git a/airflow/example_dags/example_params_ui_tutorial.py b/airflow/example_dags/example_params_ui_tutorial.py
index 4373676b6a..12992c545c 100644
--- a/airflow/example_dags/example_params_ui_tutorial.py
+++ b/airflow/example_dags/example_params_ui_tutorial.py
@@ -25,16 +25,10 @@ from __future__ import annotations
 import datetime
 import json
 from pathlib import Path
-from typing import TYPE_CHECKING
 
 from airflow.decorators import task
-from airflow.exceptions import AirflowSkipException
 from airflow.models.dag import DAG
-from airflow.models.param import Param
-
-if TYPE_CHECKING:
-    from airflow.models.dagrun import DagRun
-    from airflow.models.taskinstance import TaskInstance
+from airflow.models.param import Param, ParamsDict
 
 with DAG(
     dag_id=Path(__file__).stem,
@@ -170,9 +164,11 @@ with DAG(
         ),
         # Fields can be required or not. If the defined fields are typed they are getting required by default
         # (else they would not pass JSON schema validation) - to make typed fields optional you must
-        # permit the optional "null" type
+        # permit the optional "null" type.
+        # You can omit a default value if the DAG is triggered manually
         "required_field": Param(
-            "You can not trigger if no text is given here!",
+            # In this example we have no default value
+            # Form will enforce a value supplied by users to be able to trigger
             type="string",
             title="Required text field",
             description="This field is required. You can not submit without having text in here.",
@@ -303,13 +299,9 @@ with DAG(
     },
 ) as dag:
 
-    @task(task_id="show_params")
+    @task
     def show_params(**kwargs) -> None:
-        ti: TaskInstance = kwargs["ti"]
-        dag_run: DagRun = ti.dag_run
-        if not dag_run.conf:
-            print("Uups, no parameters supplied as DagRun.conf, was the trigger w/o form?")
-            raise AirflowSkipException("No DagRun.conf parameters supplied.")
-        print(f"This DAG was triggered with the following parameters:\n{json.dumps(dag_run.conf, indent=4)}")
+        params: ParamsDict = kwargs["params"]
+        print(f"This DAG was triggered with the following parameters:\n\n{json.dumps(params, indent=4)}\n")
 
     show_params()
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index d2540a250b..bef0bde9a7 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -726,7 +726,7 @@ class DAG(LoggingMixin):
                 f"inconsistent schedule: timetable {self.timetable.summary!r} "
                 f"does not match schedule_interval {self.schedule_interval!r}",
             )
-        self.params.validate()
+        self.validate_schedule_and_params()
         self.timetable.validate()
         self.validate_setup_teardown()
 
diff --git a/docs/apache-airflow/core-concepts/params.rst b/docs/apache-airflow/core-concepts/params.rst
index 42fd523688..b2b95252ec 100644
--- a/docs/apache-airflow/core-concepts/params.rst
+++ b/docs/apache-airflow/core-concepts/params.rst
@@ -160,6 +160,12 @@ JSON Schema Validation
         },
     ):
 
+.. note::
+    If ``schedule`` is defined for a DAG, params with defaults must be valid. This is validated during DAG parsing.
+    If ``schedule=None`` then params are not validated during DAG parsing but before triggering a DAG.
+    This is useful in cases where the DAG author does not want to provide defaults but wants to force users provide valid parameters
+    at time of trigger.
+
 .. note::
     As of now, for security reasons, one can not use :class:`~airflow.models.param.Param` objects derived out of custom classes. We are
     planning to have a registration system for custom :class:`~airflow.models.param.Param` classes, just like we've for Operator ExtraLinks.
@@ -298,7 +304,6 @@ The following features are supported in the Trigger UI Form:
           -
           - ``Param(None, type=["null", "string"])``
 
-
 - If a form field is left empty, it is passed as ``None`` value to the params dict.
 - Form fields are rendered in the order of definition of ``params`` in the DAG.
 - If you want to add sections to the Form, add the attribute ``section`` to each field. The text will be used as section label.
@@ -310,6 +315,10 @@ The following features are supported in the Trigger UI Form:
   If you want to change values manually, the JSON configuration can be adjusted. Changes are overridden when form fields change.
 - If you want to render custom HTML as form on top of the provided features, you can use the ``custom_html_form`` attribute.
 
+.. note::
+    If the field is required the default value must be valid according to the schema as well. If the DAG is defined with
+    ``schedule=None`` the parameter value validation is made at time of trigger.
+
 For examples also please take a look to two example DAGs provided: ``example_params_trigger_ui`` and ``example_params_ui_tutorial``.
 
 .. image:: ../img/trigger-dag-tutorial-form.png
diff --git a/tests/dags/test_invalid_param.py b/tests/dags/test_invalid_param.py
index 7e278cf02f..9f5fdfa05f 100644
--- a/tests/dags/test_invalid_param.py
+++ b/tests/dags/test_invalid_param.py
@@ -25,7 +25,7 @@ from airflow.operators.python import PythonOperator
 with DAG(
     "test_invalid_param",
     start_date=datetime(2021, 1, 1),
-    schedule="@once",
+    schedule="0 0 * * *",
     params={
         # a mandatory str param
         "str_param": Param(type="string", minLength=2, maxLength=4),
diff --git a/tests/dags/test_invalid_param.py b/tests/dags/test_invalid_param2.py
similarity index 84%
copy from tests/dags/test_invalid_param.py
copy to tests/dags/test_invalid_param2.py
index 7e278cf02f..7ed9d5c443 100644
--- a/tests/dags/test_invalid_param.py
+++ b/tests/dags/test_invalid_param2.py
@@ -18,17 +18,17 @@ from __future__ import annotations
 
 from datetime import datetime
 
-from airflow.models.dag import DAG
+from airflow import DAG
 from airflow.models.param import Param
 from airflow.operators.python import PythonOperator
 
 with DAG(
-    "test_invalid_param",
+    "test_invalid_param2",
     start_date=datetime(2021, 1, 1),
-    schedule="@once",
+    schedule="0 0 * * *",
     params={
-        # a mandatory str param
-        "str_param": Param(type="string", minLength=2, maxLength=4),
+        # a mandatory str param but pass None as value which is invalid
+        "str_param": Param(default=None, type="string", minLength=2, maxLength=4),
     },
 ) as the_dag:
 
diff --git a/tests/dags/test_invalid_param.py b/tests/dags/test_invalid_param3.py
similarity index 83%
copy from tests/dags/test_invalid_param.py
copy to tests/dags/test_invalid_param3.py
index 7e278cf02f..67cb1bbff7 100644
--- a/tests/dags/test_invalid_param.py
+++ b/tests/dags/test_invalid_param3.py
@@ -18,17 +18,17 @@ from __future__ import annotations
 
 from datetime import datetime
 
-from airflow.models.dag import DAG
+from airflow import DAG
 from airflow.models.param import Param
 from airflow.operators.python import PythonOperator
 
 with DAG(
-    "test_invalid_param",
+    "test_invalid_param3",
     start_date=datetime(2021, 1, 1),
-    schedule="@once",
+    schedule="0 0 * * *",
     params={
-        # a mandatory str param
-        "str_param": Param(type="string", minLength=2, maxLength=4),
+        # a mandatory number param but pass a string as default value
+        "int_param": Param(default="banana", type="integer"),
     },
 ) as the_dag:
 
@@ -40,6 +40,6 @@ with DAG(
         task_id="ref_params",
         python_callable=print_these,
         op_args=[
-            "{{ params.str_param }}",
+            "{{ params.int_param }}",
         ],
     )
diff --git a/tests/dags/test_invalid_param.py b/tests/dags/test_invalid_param4.py
similarity index 83%
copy from tests/dags/test_invalid_param.py
copy to tests/dags/test_invalid_param4.py
index 7e278cf02f..7c6354af0d 100644
--- a/tests/dags/test_invalid_param.py
+++ b/tests/dags/test_invalid_param4.py
@@ -18,17 +18,17 @@ from __future__ import annotations
 
 from datetime import datetime
 
-from airflow.models.dag import DAG
+from airflow import DAG
 from airflow.models.param import Param
 from airflow.operators.python import PythonOperator
 
 with DAG(
-    "test_invalid_param",
+    "test_invalid_param4",
     start_date=datetime(2021, 1, 1),
-    schedule="@once",
+    schedule="0 0 * * *",
     params={
-        # a mandatory str param
-        "str_param": Param(type="string", minLength=2, maxLength=4),
+        # a mandatory string but the default is not valid in length validation
+        "str_param": Param(default="banana", type="string", minLength=2, maxLength=4),
     },
 ) as the_dag:
 
diff --git a/tests/dags/test_invalid_param.py b/tests/dags/test_valid_param.py
similarity index 75%
copy from tests/dags/test_invalid_param.py
copy to tests/dags/test_valid_param.py
index 7e278cf02f..67a6b4e814 100644
--- a/tests/dags/test_invalid_param.py
+++ b/tests/dags/test_valid_param.py
@@ -18,17 +18,21 @@ from __future__ import annotations
 
 from datetime import datetime
 
-from airflow.models.dag import DAG
+from airflow import DAG
 from airflow.models.param import Param
 from airflow.operators.python import PythonOperator
 
 with DAG(
-    "test_invalid_param",
+    "test_valid_param",
     start_date=datetime(2021, 1, 1),
-    schedule="@once",
+    schedule=None,
     params={
-        # a mandatory str param
+        # a string default is not mandatory as DAG has no schedule
         "str_param": Param(type="string", minLength=2, maxLength=4),
+        # a string with None as default is also accepted as no schedule
+        "str_param2": Param(None, type="string", minLength=2, maxLength=4),
+        # But of course adding a valid default is also fine
+        "str_param3": Param("valid_default", type="string", minLength=2, maxLength=15),
     },
 ) as the_dag:
 
diff --git a/tests/dags/test_invalid_param.py b/tests/dags/test_valid_param2.py
similarity index 77%
copy from tests/dags/test_invalid_param.py
copy to tests/dags/test_valid_param2.py
index 7e278cf02f..9767357d3b 100644
--- a/tests/dags/test_invalid_param.py
+++ b/tests/dags/test_valid_param2.py
@@ -18,17 +18,19 @@ from __future__ import annotations
 
 from datetime import datetime
 
-from airflow.models.dag import DAG
+from airflow import DAG
 from airflow.models.param import Param
 from airflow.operators.python import PythonOperator
 
 with DAG(
-    "test_invalid_param",
+    "test_valid_param2",
     start_date=datetime(2021, 1, 1),
-    schedule="@once",
+    schedule="0 0 * * *",
     params={
-        # a mandatory str param
-        "str_param": Param(type="string", minLength=2, maxLength=4),
+        # mandatory string has default, this is how we want it!
+        "str_param": Param("some_default", type="string", minLength=2, maxLength=12),
+        # Field does not need to have a default if type is nullable
+        "optional_str_param": Param(None, type=["null", "string"]),
     },
 ) as the_dag:
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 5e83c77a64..43cf0dc282 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3110,6 +3110,9 @@ class TestSchedulerJob:
             "test_invalid_dup_task.py",
             "test_ignore_this.py",
             "test_invalid_param.py",
+            "test_invalid_param2.py",
+            "test_invalid_param3.py",
+            "test_invalid_param4.py",
             "test_nested_dag.py",
             "test_imports.py",
             "__init__.py",
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index e204a16d10..8ebff1dc16 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -284,9 +284,14 @@ class TestDagBag:
 
     def test_process_file_invalid_param_check(self, tmp_path):
         """
-        test if an invalid param in the dag param can be identified
+        test if an invalid param in the dags can be identified
         """
-        invalid_dag_files = ["test_invalid_param.py"]
+        invalid_dag_files = [
+            "test_invalid_param.py",
+            "test_invalid_param2.py",
+            "test_invalid_param3.py",
+            "test_invalid_param4.py",
+        ]
         dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
 
         assert len(dagbag.import_errors) == 0
@@ -295,6 +300,22 @@ class TestDagBag:
         assert len(dagbag.import_errors) == len(invalid_dag_files)
         assert len(dagbag.dags) == 0
 
+    def test_process_file_valid_param_check(self, tmp_path):
+        """
+        test if valid params in the dags param can be validated (positive test)
+        """
+        valid_dag_files = [
+            "test_valid_param.py",
+            "test_valid_param2.py",
+        ]
+        dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
+
+        assert len(dagbag.import_errors) == 0
+        for file in valid_dag_files:
+            dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, file))
+        assert len(dagbag.import_errors) == 0
+        assert len(dagbag.dags) == len(valid_dag_files)
+
     @patch.object(DagModel, "get_current")
     def test_get_dag_without_refresh(self, mock_dagmodel):
         """