You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/03 10:57:55 UTC

[GitHub] [airflow] ashb commented on a change in pull request #17100: WIP: Advanced Params using json-schema

ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r681618325



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -251,10 +252,12 @@ def post_dag_run(dag_id, session):
         .first()
     )
     if not dagrun_instance:
-        dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL, **post_body)
-        session.add(dag_run)
-        session.commit()
-        return dagrun_schema.dump(dag_run)
+        try:
+            dag = current_app.dag_bag.get_dag(dag_id)
+            dag_run = dag.create_dagrun(run_type=DagRunType.MANUAL, state=State.RUNNING, **post_body)

Review comment:
       Why did we change this call to add a state here?
   
   As of a recent-ish change only the Scheduler should ever put a DagRun in to RUNNING state, everyting else should create/put it in the QUEUED state.

##########
File path: airflow/cli/commands/task_command.py
##########
@@ -404,7 +404,14 @@ def task_test(args, dag=None):
     # Add CLI provided task_params to task.params
     if args.task_params:
         passed_in_params = json.loads(args.task_params)
+        for k, v in task.params.items():
+            if k in passed_in_params:
+                v.default = passed_in_params.pop(k)
+                v()  # to raise if there are any validation issues

Review comment:
       This looks like it would fail for basic/non-validating param types

##########
File path: airflow/example_dags/example_complex_params.py
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the complex params."""
+
+from airflow import DAG
+from airflow.models.param import Param
+from airflow.operators.bash import BashOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+    "example_complex_params",
+    default_args={
+        "owner": "airflow",
+    },
+    params={
+        'int_param': Param(10, type="integer", minimum=0, maximum=20),  # non default int param
+        'str_param': Param(type="string", minLength=2, maxLength=4),  # a mandatory str param
+        'old_param': 'old_way_of_passing',
+        'simple_param': Param('im_just_like_old_param'),  # i.e. no type checking
+        'email_param': Param(
+            'example@example.com', type='string', pattern='\\S+@\\S+\\.\\S+$', minLength=6, maxLength=255

Review comment:
       Assuming this is a JSON Schema/
   
   ```suggestion
               'example@example.com', type='string', format='idn-email', minLength=5, maxLength=255
   ```

##########
File path: airflow/models/dag.py
##########
@@ -2312,6 +2327,19 @@ def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: Ed
         """
         self.edge_info.setdefault(upstream_task_id, {})[downstream_task_id] = info
 
+    def validate_schedule_and_params(self):

Review comment:
       ```suggestion
       def validate_schedule_and_params(self):
           if self.schedule_interval is None:
               return
   ```
   
   and then we can eliminate one of the conditions below

##########
File path: airflow/models/dag.py
##########
@@ -2312,6 +2327,19 @@ def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: Ed
         """
         self.edge_info.setdefault(upstream_task_id, {})[downstream_task_id] = info
 
+    def validate_schedule_and_params(self):
+        for k, v in self.params.items():
+            if isinstance(v, Param):
+                if (
+                    v.default is None
+                    and self.schedule_interval is not None
+                    and ('type' not in v.schema or "null" not in v.schema['type'])
+                ):

Review comment:
       `"null" not in v.schema['type']` looks wrong -- shouldn't it be
   
   ```suggestion
                   if (
                       v.default is None
                       and self.schedule_interval is not None
                       and ('type' not in v.schema or v.schema['type'] != "null")
                   ):
   ```
   
   

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any
+
+from jsonschema import Draft4Validator, FormatChecker
+from jsonschema.exceptions import ValidationError
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+    default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                validator = Draft4Validator(self.schema, format_checker=FormatChecker())
+                validator.validate(self.default)

Review comment:
       We should check the schema instead, using `Draft7Validator.check_schema` - https://python-jsonschema.readthedocs.io/en/latest/validate/#jsonschema.IValidator.check_schema

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -789,6 +825,27 @@ def from_dict(cls, serialized_obj: dict) -> 'SerializedDAG':
             raise ValueError(f"Unsure how to deserialize version {ver!r}")
         return cls.deserialize_dag(serialized_obj['dag'])
 
+    @classmethod
+    def _serialize_dag_params(cls, dag_params: Dict[str, Param]):
+        """ """
+        serialized_params = {}
+        for k, v in dag_params.items():
+            serialized_params[k] = v.__dict__
+            serialized_params[k].update({'__type': f'{v.__class__.__module__}.{v.__class__.__name__}'})
+
+        return serialized_params
+
+    @classmethod
+    def _deserialize_dag_params(cls, encoded_dag_params: Dict) -> Dict[str, Param]:
+        """ """
+        op_params = {}
+        for k, v in encoded_dag_params.items():
+            param_class = import_string(v['__type'])

Review comment:
       This is potentially risky (to import any string, and then instantiate a object of that class) -- everywhere else in Airflow we require classes that we deserilze to be registered in some way to prevent against what I've been calling "object inflation attacks". (The canonical example is from and old Ruby on Rails bug https://codeclimate.com/blog/rails-remote-code-execution-vulnerability-explained/ )
   
   Right now this input is just about trusted, but I've been operating on the principle that it isn't and shouldn't be, as either we'll use this elsewhere in Airflow, or open it up to API submission.
   
   Not sure 100% what the right answer is here, but right now this is against what we've done elsewhere in Airflow.
   
   Options I can think of:
   
   1. Don't allow this to be pluggable -- so this is _only_ allowed to be an `airflow.models.param.Param`
   2. Have some kind of registration like we have with OperatorExtraLinks.
   
   
   

##########
File path: airflow/models/dag.py
##########
@@ -2312,6 +2327,19 @@ def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: Ed
         """
         self.edge_info.setdefault(upstream_task_id, {})[downstream_task_id] = info
 
+    def validate_schedule_and_params(self):

Review comment:
       @uranusjr What is the best way of checking this given the introduction of timetables?
   
   Should this be this?
   
   ```suggestion
       def validate_schedule_and_params(self):
           if isinstance(self.timetable, NullTimetable):
               return
   ```k

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -789,6 +825,27 @@ def from_dict(cls, serialized_obj: dict) -> 'SerializedDAG':
             raise ValueError(f"Unsure how to deserialize version {ver!r}")
         return cls.deserialize_dag(serialized_obj['dag'])
 
+    @classmethod
+    def _serialize_dag_params(cls, dag_params: Dict[str, Param]):
+        """ """
+        serialized_params = {}
+        for k, v in dag_params.items():
+            serialized_params[k] = v.__dict__
+            serialized_params[k].update({'__type': f'{v.__class__.__module__}.{v.__class__.__name__}'})

Review comment:
       Additionally: everywhere ls `__type` is one of an enum, this should be `__class` or similar.

##########
File path: airflow/cli/commands/dag_command.py
##########
@@ -104,21 +104,24 @@ def dag_backfill(args, dag=None):
                 dag_run_state=State.NONE,
             )
 
-        dag.run(
-            start_date=args.start_date,
-            end_date=args.end_date,
-            mark_success=args.mark_success,
-            local=args.local,
-            donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
-            ignore_first_depends_on_past=args.ignore_first_depends_on_past,
-            ignore_task_deps=args.ignore_dependencies,
-            pool=args.pool,
-            delay_on_limit_secs=args.delay_on_limit,
-            verbose=args.verbose,
-            conf=run_conf,
-            rerun_failed_tasks=args.rerun_failed_tasks,
-            run_backwards=args.run_backwards,
-        )
+        try:
+            dag.run(
+                start_date=args.start_date,
+                end_date=args.end_date,
+                mark_success=args.mark_success,
+                local=args.local,
+                donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
+                ignore_first_depends_on_past=args.ignore_first_depends_on_past,
+                ignore_task_deps=args.ignore_dependencies,
+                pool=args.pool,
+                delay_on_limit_secs=args.delay_on_limit,
+                verbose=args.verbose,
+                conf=run_conf,
+                rerun_failed_tasks=args.rerun_failed_tasks,
+                run_backwards=args.run_backwards,
+            )
+        except ValueError as vr:
+            logging.error(str(vr))

Review comment:
       ?

##########
File path: airflow/example_dags/example_complex_params.py
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the complex params."""
+
+from airflow import DAG
+from airflow.models.param import Param
+from airflow.operators.bash import BashOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+    "example_complex_params",
+    default_args={
+        "owner": "airflow",
+    },

Review comment:
       Not needed/is already default
   
   ```suggestion
   ```

##########
File path: airflow/models/dag.py
##########
@@ -2020,6 +2025,15 @@ def create_dagrun(
                 "Creating DagRun needs either `run_id` or both `run_type` and `execution_date`"
             )
 
+        try:
+            for k, v in copy.deepcopy(self.params).items():
+                if isinstance(v, Param):
+                    if conf and k in conf:
+                        v.default = conf[k]
+                    v()
+        except ValueError as ve:
+            raise ValueError(f"Invalid input for param '{k}': {ve}")

Review comment:
       ```suggestion
               raise ValueError(f"Invalid input for param '{k}': {ve}") from None
   ```
   
   Otherwise we'll see two stack traces here which is likely confusing and doesn't help our users.

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any
+
+from jsonschema import Draft4Validator, FormatChecker
+from jsonschema.exceptions import ValidationError
+
+
+class Param:

Review comment:
       Hmmm, having airflow.models.params.Param and airflow.models.dagparam.DagParam is confusing -- do we still need both?
   
   Probably, as they do different things, but the naming is going to lead to confision 🤔 

##########
File path: airflow/example_dags/example_complex_params.py
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the complex params."""
+
+from airflow import DAG
+from airflow.models.param import Param
+from airflow.operators.bash import BashOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+    "example_complex_params",
+    default_args={
+        "owner": "airflow",
+    },
+    params={
+        'int_param': Param(10, type="integer", minimum=0, maximum=20),  # non default int param
+        'str_param': Param(type="string", minLength=2, maxLength=4),  # a mandatory str param
+        'old_param': 'old_way_of_passing',
+        'simple_param': Param('im_just_like_old_param'),  # i.e. no type checking
+        'email_param': Param(
+            'example@example.com', type='string', pattern='\\S+@\\S+\\.\\S+$', minLength=6, maxLength=255
+        ),
+    },
+    schedule_interval=None,
+    start_date=days_ago(1),
+    tags=['example'],
+) as dag:
+    all_params = BashOperator(
+        task_id='all_param',
+        bash_command="echo {{ params.int_param }} {{ params.str_param }} {{ params.old_param }} "
+        "{{ params.simple_param }} {{ params.email_param }} {{ params.task_param }}",

Review comment:
       This is prone to escaping bugs:
   
   ```suggestion
           bash_command="echo {{ params.int_param!r }} {{ params.str_param!r }} {{ params.old_param!r }} "
           "{{ params.simple_param!r }} {{ params.email_param!r }} {{ params.task_param!r }}",
   ```

##########
File path: airflow/models/dag.py
##########
@@ -2020,6 +2025,15 @@ def create_dagrun(
                 "Creating DagRun needs either `run_id` or both `run_type` and `execution_date`"
             )
 
+        try:
+            for k, v in copy.deepcopy(self.params).items():
+                if isinstance(v, Param):
+                    if conf and k in conf:
+                        v.default = conf[k]
+                    v()

Review comment:
       We should delay the deepcopy until as late as possible
   ```suggestion
               for k, v in self.params.items():
                   if isinstance(v, Param):
                       if conf and k in conf:
                           v = copy.deepcopy(v)
                           v.default = conf[k]
                       v()
   ```

##########
File path: airflow/models/taskinstance.py
##########
@@ -1749,7 +1761,13 @@ def overwrite_params_with_dag_run_conf(self, params, dag_run):
         """Overwrite Task Params with DagRun.conf"""
         if dag_run and dag_run.conf:
             self.log.debug("Updating task params (%s) with DagRun.conf (%s)", params, dag_run.conf)
-            params.update(dag_run.conf)
+            for k, v in dag_run.conf.items():
+                if k in params:
+                    params[k].default = v

Review comment:
       Same comment here about setting default (we shouldn't)

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any
+
+from jsonschema import Draft4Validator, FormatChecker
+from jsonschema.exceptions import ValidationError
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+    default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                validator = Draft4Validator(self.schema, format_checker=FormatChecker())

Review comment:
       Draft4 is old, we should be using 2019-09/the latest Draft that jsonschema modules supports (Draft 7?)

##########
File path: airflow/models/taskinstance.py
##########
@@ -1522,7 +1523,7 @@ def is_eligible_to_retry(self):
         return self.task.retries and self.try_number <= self.max_tries
 
     @provide_session
-    def get_template_context(self, session=None) -> Context:
+    def get_template_context(self, ignore_param_exceptions=True, session=None) -> Context:

Review comment:
       Why is the default to ignore exceptions?

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any
+
+from jsonschema import Draft4Validator, FormatChecker
+from jsonschema.exceptions import ValidationError
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+    default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                validator = Draft4Validator(self.schema, format_checker=FormatChecker())
+                validator.validate(self.default)
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def __call__(self, suppress_exception: bool = False) -> Any:

Review comment:
       Using self.default in here for _the actual value_ seem odd -- I would have expected this instead to be
   
   ```suggestion
       def __call__(self, value: Optional[T], suppress_exception: bool = False) -> T:
   ```

##########
File path: tests/models/test_dagparam.py
##########
@@ -76,7 +76,7 @@ def return_num(num):
 
             xcom_arg = return_num(value)
 
-        assert dag.params['value'] == self.VALUE
+        assert dag.params['value']() == self.VALUE

Review comment:
       Hmmmm, I wonder if this makes it a breaking change.

##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -85,7 +85,6 @@ def __init__(
         bucket_name: str,
         object_name: str,
         fields: List[str],
-        params: Dict[str, Any],

Review comment:
       I _suspect_ that this params was meant to be different to the params from the base operator and this is a name collision (that until now hasn't caused any problems)

##########
File path: airflow/models/dag.py
##########
@@ -2020,6 +2025,15 @@ def create_dagrun(
                 "Creating DagRun needs either `run_id` or both `run_type` and `execution_date`"
             )
 
+        try:
+            for k, v in copy.deepcopy(self.params).items():
+                if isinstance(v, Param):
+                    if conf and k in conf:
+                        v.default = conf[k]
+                    v()

Review comment:
       Actually -- we shouldn't ever change v.default -- this seems like an odd pattern to set overwrite the default with the provided value.

##########
File path: airflow/models/dag.py
##########
@@ -1976,7 +1981,7 @@ def create_dagrun(
         run_id: Optional[str] = None,
         start_date: Optional[datetime] = None,
         external_trigger: Optional[bool] = False,
-        conf: Optional[dict] = None,
+        conf: Optional[dict] = {},

Review comment:
       This is only a problem if/when `dagrun.conf` is ever edited in place (by adding keys etc) -- and given we can't tell where `dagrun.conf` might be used elsewhere in the codebase it would be safer to change this, because while it might not cause problems _right now_ it might bite us later.

##########
File path: airflow/www/views.py
##########
@@ -1560,19 +1562,24 @@ def trigger(self, session=None):
                     'airflow/trigger.html', dag_id=dag_id, origin=origin, conf=request_conf, form=form
                 )
 
-        dag = current_app.dag_bag.get_dag(dag_id)
-
         if unpause and dag.is_paused:
             models.DagModel.get_dagmodel(dag_id).set_is_paused(is_paused=False)
 
-        dag.create_dagrun(
-            run_type=DagRunType.MANUAL,
-            execution_date=execution_date,
-            state=State.QUEUED,
-            conf=run_conf,
-            external_trigger=True,
-            dag_hash=current_app.dag_bag.dags_hash.get(dag_id),
-        )
+        try:
+            dag.create_dagrun(
+                run_type=DagRunType.MANUAL,
+                execution_date=execution_date,
+                state=State.RUNNING,

Review comment:
       ```suggestion
                   state=State.QUEUED,
   ```

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -789,6 +825,27 @@ def from_dict(cls, serialized_obj: dict) -> 'SerializedDAG':
             raise ValueError(f"Unsure how to deserialize version {ver!r}")
         return cls.deserialize_dag(serialized_obj['dag'])
 
+    @classmethod
+    def _serialize_dag_params(cls, dag_params: Dict[str, Param]):
+        """ """
+        serialized_params = {}
+        for k, v in dag_params.items():
+            serialized_params[k] = v.__dict__
+            serialized_params[k].update({'__type': f'{v.__class__.__module__}.{v.__class__.__name__}'})

Review comment:
       Isn't this the same as `v.dump()`?

##########
File path: airflow/models/taskinstance.py
##########
@@ -1590,6 +1591,17 @@ def get_template_context(self, session=None) -> Context:
         if conf.getboolean('core', 'dag_run_conf_overrides_params'):
             self.overwrite_params_with_dag_run_conf(params=params, dag_run=dag_run)
 
+        # Now update params dict with simple values
+        for k, v in params.items():
+            if isinstance(v, Param):
+                try:
+                    params[k] = v()
+                except ValueError:
+                    if ignore_param_exceptions:
+                        params[k] = None
+                    else:
+                        raise

Review comment:
       ```suggestion
                   params[k] = v(supress_exceptions=ignore_param_exceptions)
   ```

##########
File path: airflow/models/dag.py
##########
@@ -2312,6 +2327,19 @@ def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: Ed
         """
         self.edge_info.setdefault(upstream_task_id, {})[downstream_task_id] = info
 
+    def validate_schedule_and_params(self):
+        for k, v in self.params.items():
+            if isinstance(v, Param):
+                if (
+                    v.default is None
+                    and self.schedule_interval is not None
+                    and ('type' not in v.schema or "null" not in v.schema['type'])
+                ):

Review comment:
       And by moving the schedule_interval check up we can simplify this to one line
   ```suggestion
                   if v.default is None and ('type' not in v.schema or v.schema['type'] != 'null'):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org