You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/10/28 19:26:25 UTC
(airflow) branch main updated: Allow optional defaults in required fields with manual triggered dags (#31301)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3d7c8f7d05 Allow optional defaults in required fields with manual triggered dags (#31301)
3d7c8f7d05 is described below
commit 3d7c8f7d058284a4eb7e5e42eda56b3c284b7866
Author: Jens Scheffler <95...@users.noreply.github.com>
AuthorDate: Sat Oct 28 21:26:18 2023 +0200
Allow optional defaults in required fields with manual triggered dags (#31301)
---
airflow/example_dags/example_params_ui_tutorial.py | 24 +++++++--------------
airflow/models/dag.py | 2 +-
docs/apache-airflow/core-concepts/params.rst | 11 +++++++++-
tests/dags/test_invalid_param.py | 2 +-
...est_invalid_param.py => test_invalid_param2.py} | 10 ++++-----
...est_invalid_param.py => test_invalid_param3.py} | 12 +++++------
...est_invalid_param.py => test_invalid_param4.py} | 10 ++++-----
.../{test_invalid_param.py => test_valid_param.py} | 12 +++++++----
...{test_invalid_param.py => test_valid_param2.py} | 12 ++++++-----
tests/jobs/test_scheduler_job.py | 3 +++
tests/models/test_dagbag.py | 25 ++++++++++++++++++++--
11 files changed, 77 insertions(+), 46 deletions(-)
diff --git a/airflow/example_dags/example_params_ui_tutorial.py b/airflow/example_dags/example_params_ui_tutorial.py
index 4373676b6a..12992c545c 100644
--- a/airflow/example_dags/example_params_ui_tutorial.py
+++ b/airflow/example_dags/example_params_ui_tutorial.py
@@ -25,16 +25,10 @@ from __future__ import annotations
import datetime
import json
from pathlib import Path
-from typing import TYPE_CHECKING
from airflow.decorators import task
-from airflow.exceptions import AirflowSkipException
from airflow.models.dag import DAG
-from airflow.models.param import Param
-
-if TYPE_CHECKING:
- from airflow.models.dagrun import DagRun
- from airflow.models.taskinstance import TaskInstance
+from airflow.models.param import Param, ParamsDict
with DAG(
dag_id=Path(__file__).stem,
@@ -170,9 +164,11 @@ with DAG(
),
# Fields can be required or not. If the defined fields are typed they are getting required by default
# (else they would not pass JSON schema validation) - to make typed fields optional you must
- # permit the optional "null" type
+ # permit the optional "null" type.
+ # You can omit a default value if the DAG is triggered manually
"required_field": Param(
- "You can not trigger if no text is given here!",
+ # In this example we have no default value
+ # Form will enforce a value supplied by users to be able to trigger
type="string",
title="Required text field",
description="This field is required. You can not submit without having text in here.",
@@ -303,13 +299,9 @@ with DAG(
},
) as dag:
- @task(task_id="show_params")
+ @task
def show_params(**kwargs) -> None:
- ti: TaskInstance = kwargs["ti"]
- dag_run: DagRun = ti.dag_run
- if not dag_run.conf:
- print("Uups, no parameters supplied as DagRun.conf, was the trigger w/o form?")
- raise AirflowSkipException("No DagRun.conf parameters supplied.")
- print(f"This DAG was triggered with the following parameters:\n{json.dumps(dag_run.conf, indent=4)}")
+ params: ParamsDict = kwargs["params"]
+ print(f"This DAG was triggered with the following parameters:\n\n{json.dumps(params, indent=4)}\n")
show_params()
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index d2540a250b..bef0bde9a7 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -726,7 +726,7 @@ class DAG(LoggingMixin):
f"inconsistent schedule: timetable {self.timetable.summary!r} "
f"does not match schedule_interval {self.schedule_interval!r}",
)
- self.params.validate()
+ self.validate_schedule_and_params()
self.timetable.validate()
self.validate_setup_teardown()
diff --git a/docs/apache-airflow/core-concepts/params.rst b/docs/apache-airflow/core-concepts/params.rst
index 42fd523688..b2b95252ec 100644
--- a/docs/apache-airflow/core-concepts/params.rst
+++ b/docs/apache-airflow/core-concepts/params.rst
@@ -160,6 +160,12 @@ JSON Schema Validation
},
):
+.. note::
+ If ``schedule`` is defined for a DAG, params with defaults must be valid. This is validated during DAG parsing.
+ If ``schedule=None`` then params are not validated during DAG parsing but before triggering a DAG.
+ This is useful in cases where the DAG author does not want to provide defaults but wants to force users provide valid parameters
+ at time of trigger.
+
.. note::
As of now, for security reasons, one can not use :class:`~airflow.models.param.Param` objects derived out of custom classes. We are
planning to have a registration system for custom :class:`~airflow.models.param.Param` classes, just like we've for Operator ExtraLinks.
@@ -298,7 +304,6 @@ The following features are supported in the Trigger UI Form:
-
- ``Param(None, type=["null", "string"])``
-
- If a form field is left empty, it is passed as ``None`` value to the params dict.
- Form fields are rendered in the order of definition of ``params`` in the DAG.
- If you want to add sections to the Form, add the attribute ``section`` to each field. The text will be used as section label.
@@ -310,6 +315,10 @@ The following features are supported in the Trigger UI Form:
If you want to change values manually, the JSON configuration can be adjusted. Changes are overridden when form fields change.
- If you want to render custom HTML as form on top of the provided features, you can use the ``custom_html_form`` attribute.
+.. note::
+ If the field is required the default value must be valid according to the schema as well. If the DAG is defined with
+ ``schedule=None`` the parameter value validation is made at time of trigger.
+
For examples also please take a look to two example DAGs provided: ``example_params_trigger_ui`` and ``example_params_ui_tutorial``.
.. image:: ../img/trigger-dag-tutorial-form.png
diff --git a/tests/dags/test_invalid_param.py b/tests/dags/test_invalid_param.py
index 7e278cf02f..9f5fdfa05f 100644
--- a/tests/dags/test_invalid_param.py
+++ b/tests/dags/test_invalid_param.py
@@ -25,7 +25,7 @@ from airflow.operators.python import PythonOperator
with DAG(
"test_invalid_param",
start_date=datetime(2021, 1, 1),
- schedule="@once",
+ schedule="0 0 * * *",
params={
# a mandatory str param
"str_param": Param(type="string", minLength=2, maxLength=4),
diff --git a/tests/dags/test_invalid_param.py b/tests/dags/test_invalid_param2.py
similarity index 84%
copy from tests/dags/test_invalid_param.py
copy to tests/dags/test_invalid_param2.py
index 7e278cf02f..7ed9d5c443 100644
--- a/tests/dags/test_invalid_param.py
+++ b/tests/dags/test_invalid_param2.py
@@ -18,17 +18,17 @@ from __future__ import annotations
from datetime import datetime
-from airflow.models.dag import DAG
+from airflow import DAG
from airflow.models.param import Param
from airflow.operators.python import PythonOperator
with DAG(
- "test_invalid_param",
+ "test_invalid_param2",
start_date=datetime(2021, 1, 1),
- schedule="@once",
+ schedule="0 0 * * *",
params={
- # a mandatory str param
- "str_param": Param(type="string", minLength=2, maxLength=4),
+ # a mandatory str param but pass None as value which is invalid
+ "str_param": Param(default=None, type="string", minLength=2, maxLength=4),
},
) as the_dag:
diff --git a/tests/dags/test_invalid_param.py b/tests/dags/test_invalid_param3.py
similarity index 83%
copy from tests/dags/test_invalid_param.py
copy to tests/dags/test_invalid_param3.py
index 7e278cf02f..67cb1bbff7 100644
--- a/tests/dags/test_invalid_param.py
+++ b/tests/dags/test_invalid_param3.py
@@ -18,17 +18,17 @@ from __future__ import annotations
from datetime import datetime
-from airflow.models.dag import DAG
+from airflow import DAG
from airflow.models.param import Param
from airflow.operators.python import PythonOperator
with DAG(
- "test_invalid_param",
+ "test_invalid_param3",
start_date=datetime(2021, 1, 1),
- schedule="@once",
+ schedule="0 0 * * *",
params={
- # a mandatory str param
- "str_param": Param(type="string", minLength=2, maxLength=4),
+ # a mandatory number param but pass a string as default value
+ "int_param": Param(default="banana", type="integer"),
},
) as the_dag:
@@ -40,6 +40,6 @@ with DAG(
task_id="ref_params",
python_callable=print_these,
op_args=[
- "{{ params.str_param }}",
+ "{{ params.int_param }}",
],
)
diff --git a/tests/dags/test_invalid_param.py b/tests/dags/test_invalid_param4.py
similarity index 83%
copy from tests/dags/test_invalid_param.py
copy to tests/dags/test_invalid_param4.py
index 7e278cf02f..7c6354af0d 100644
--- a/tests/dags/test_invalid_param.py
+++ b/tests/dags/test_invalid_param4.py
@@ -18,17 +18,17 @@ from __future__ import annotations
from datetime import datetime
-from airflow.models.dag import DAG
+from airflow import DAG
from airflow.models.param import Param
from airflow.operators.python import PythonOperator
with DAG(
- "test_invalid_param",
+ "test_invalid_param4",
start_date=datetime(2021, 1, 1),
- schedule="@once",
+ schedule="0 0 * * *",
params={
- # a mandatory str param
- "str_param": Param(type="string", minLength=2, maxLength=4),
+ # a mandatory string but the default is not valid in length validation
+ "str_param": Param(default="banana", type="string", minLength=2, maxLength=4),
},
) as the_dag:
diff --git a/tests/dags/test_invalid_param.py b/tests/dags/test_valid_param.py
similarity index 75%
copy from tests/dags/test_invalid_param.py
copy to tests/dags/test_valid_param.py
index 7e278cf02f..67a6b4e814 100644
--- a/tests/dags/test_invalid_param.py
+++ b/tests/dags/test_valid_param.py
@@ -18,17 +18,21 @@ from __future__ import annotations
from datetime import datetime
-from airflow.models.dag import DAG
+from airflow import DAG
from airflow.models.param import Param
from airflow.operators.python import PythonOperator
with DAG(
- "test_invalid_param",
+ "test_valid_param",
start_date=datetime(2021, 1, 1),
- schedule="@once",
+ schedule=None,
params={
- # a mandatory str param
+ # a string default is not mandatory as DAG has no schedule
"str_param": Param(type="string", minLength=2, maxLength=4),
+ # a string with None as default is also accepted as no schedule
+ "str_param2": Param(None, type="string", minLength=2, maxLength=4),
+ # But of course adding a valid default is also fine
+ "str_param3": Param("valid_default", type="string", minLength=2, maxLength=15),
},
) as the_dag:
diff --git a/tests/dags/test_invalid_param.py b/tests/dags/test_valid_param2.py
similarity index 77%
copy from tests/dags/test_invalid_param.py
copy to tests/dags/test_valid_param2.py
index 7e278cf02f..9767357d3b 100644
--- a/tests/dags/test_invalid_param.py
+++ b/tests/dags/test_valid_param2.py
@@ -18,17 +18,19 @@ from __future__ import annotations
from datetime import datetime
-from airflow.models.dag import DAG
+from airflow import DAG
from airflow.models.param import Param
from airflow.operators.python import PythonOperator
with DAG(
- "test_invalid_param",
+ "test_valid_param2",
start_date=datetime(2021, 1, 1),
- schedule="@once",
+ schedule="0 0 * * *",
params={
- # a mandatory str param
- "str_param": Param(type="string", minLength=2, maxLength=4),
+ # mandatory string has default, this is how we want it!
+ "str_param": Param("some_default", type="string", minLength=2, maxLength=12),
+ # Field does not need to have a default if type is nullable
+ "optional_str_param": Param(None, type=["null", "string"]),
},
) as the_dag:
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 5e83c77a64..43cf0dc282 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3110,6 +3110,9 @@ class TestSchedulerJob:
"test_invalid_dup_task.py",
"test_ignore_this.py",
"test_invalid_param.py",
+ "test_invalid_param2.py",
+ "test_invalid_param3.py",
+ "test_invalid_param4.py",
"test_nested_dag.py",
"test_imports.py",
"__init__.py",
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index e204a16d10..8ebff1dc16 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -284,9 +284,14 @@ class TestDagBag:
def test_process_file_invalid_param_check(self, tmp_path):
"""
- test if an invalid param in the dag param can be identified
+ test if an invalid param in the dags can be identified
"""
- invalid_dag_files = ["test_invalid_param.py"]
+ invalid_dag_files = [
+ "test_invalid_param.py",
+ "test_invalid_param2.py",
+ "test_invalid_param3.py",
+ "test_invalid_param4.py",
+ ]
dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
assert len(dagbag.import_errors) == 0
@@ -295,6 +300,22 @@ class TestDagBag:
assert len(dagbag.import_errors) == len(invalid_dag_files)
assert len(dagbag.dags) == 0
+ def test_process_file_valid_param_check(self, tmp_path):
+ """
+ test if valid params in the dags param can be validated (positive test)
+ """
+ valid_dag_files = [
+ "test_valid_param.py",
+ "test_valid_param2.py",
+ ]
+ dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=False)
+
+ assert len(dagbag.import_errors) == 0
+ for file in valid_dag_files:
+ dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, file))
+ assert len(dagbag.import_errors) == 0
+ assert len(dagbag.dags) == len(valid_dag_files)
+
@patch.object(DagModel, "get_current")
def test_get_dag_without_refresh(self, mock_dagmodel):
"""