You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/20 06:01:00 UTC

[GitHub] [airflow] msumit opened a new pull request #17100: WIP: Advanced Params using json-schema

msumit opened a new pull request #17100:
URL: https://github.com/apache/airflow/pull/17100


   Airflow takes a params dictionary at the DAG level or at a Task level that can be overridden by providing dag_run.conf values. However, this params dictionary is quite static in nature and doesn't provide much value addition. 
   
   There have been quite some requests made by the community on this already, like https://github.com/apache/airflow/issues/11054, https://github.com/apache/airflow/issues/16430, https://github.com/apache/airflow/issues/17085
   
   ## Goal
   
   - Keep the backward compatibility, i.e. the simple params should work as they are right now
   - The params should have the notion of a default value, different types (int, bool, str etc), and various options to validate the user input.
   - The UI should show proper input controls according to the type of param, showing which are must and which are optional, pre-filled with their default values if any.
   - It would be good if UI can show the options list or do live pattern matches if a param utilizes them.
   - Airflow honors these params even if someone triggers a DAG via CLI or API.
   
   ## Proposal
   
   - We create a new class or set of classes say `Param` which can be used in place of the value part of params dictionary.
   - This class should hold the default value and validation rules as well.
   - There should be a method that validates & resolves the value of this Param class. The value could be the default one or provided by the user.
   - We should be able to easily serialize or deserialize it out of DB and use it in place of a normal params value place.
   - Should work with the standard way of DAG creation as well as with the new DAG decorator.
   
   ## Approaches
   
   ### [pydantic](https://pydantic-docs.helpmanual.io/)
   
   Pydantic is one of the fastest Python libraries to provide data & type validations ([benchmark](https://pydantic-docs.helpmanual.io/benchmarks/)). I'd implemented various params classes in it (see [sample](https://gist.github.com/msumit/1a596f3a98f411dae891a42cd13e2812)) but did not like the way I had to write validators for each field separately. Also, the order you define fields matters a lot how one can access them in those validator methods. 
   
   ### [attrs](https://pypi.org/project/attrs/)
   
   Have used attrs previously and it's also in use within Airflow already. attrs simplifies writing classes and also exposes various in-build validators & pre-post init methods. Using attrs it was quite easy to create these classes (see [this](https://github.com/astronomer/airflow/blob/dag_params/airflow/models/params.py)), though we've to fill in the logic by ourselves to do the data validation. We also felt that more & more such data validation requirements would come from the users and it could turn into a big pile of code in itself. 
   
   ### [json-schema](https://json-schema.org/understanding-json-schema/)
   
   We are using json-schema for DAG serialization already. json-schema has a very powerful & extensive way to define properties (validations) on a field in a language-agnostic way. It has implementation libs in almost all major [languages](https://json-schema.org/implementations.html). The custom code using json-schema is pretty minimum ([here](https://github.com/astronomer/airflow/blob/simple_params/airflow/models/param.py)) & provides very extensive validations. 
   
   We should be able to use its Javascript implementation and validate data on the UI itself. The only concern here is that the json-schema rules can become pretty complex easily & users might found it hard to read and understand.
   
   
   ### Trigger DAG page
   <img width="1056" alt="Screenshot 2021-07-15 at 2 06 36 PM" src="https://user-images.githubusercontent.com/2018407/126268835-f833c15a-cf9e-4d67-a242-d734fff43af9.png">
   <img width="1159" alt="Screenshot 2021-07-15 at 2 08 36 PM" src="https://user-images.githubusercontent.com/2018407/126268847-4ea5d70c-70a7-49f6-8a40-36ea03f54dbd.png">
   
   ### DAG details API
   <img width="354" alt="Screenshot 2021-07-15 at 2 10 18 PM" src="https://user-images.githubusercontent.com/2018407/126268850-4c853358-f146-4aa5-b6cc-187aac6180d8.png">
   
   ### DAG Trigger API
   <img width="1413" alt="Screenshot 2021-07-15 at 2 11 15 PM" src="https://user-images.githubusercontent.com/2018407/126268853-5706f1dd-bf87-4d33-a641-9f93d054c635.png">
   
   ### DAG trigger via CLI
   ```
   $airflow dags trigger example_complex_params --conf '{"str_param": "hello"}'
   
   ValueError: Invalid input for param 'str_param': 'hello' is too long
   
   Failed validating 'maxLength' in schema:
       {'maxLength': 4, 'minLength': 2, 'type': 'string'}
   
   On instance:
       'hello'
   ```
   
   ### Tasks test via CLI
   ```
   $airflow tasks test example_complex_params all_param 2021-07-15T08:43:45 -t '{"task_param": true}'
   
   ValueError: True is not of type 'string'
   
   Failed validating 'type' in schema:
       {'type': 'string'}
   
   On instance:
       True    
   ```
   
   Thanks a lot to @ashb & @kaxil for their inputs. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r701800858



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       We can easily add "additional_requirements' airflow `2.2.0+` for those two providers, to not complicate things. That will handle 2.2 with "hard" limit and constraints will take care about the pre-2.2 default installation. We can also add a check in those new provider's code to raise warning/exception if they are used in 2.1 environment. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: WIP: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r682326608



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -251,10 +252,12 @@ def post_dag_run(dag_id, session):
         .first()
     )
     if not dagrun_instance:
-        dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL, **post_body)
-        session.add(dag_run)
-        session.commit()
-        return dagrun_schema.dump(dag_run)
+        try:
+            dag = current_app.dag_bag.get_dag(dag_id)
+            dag_run = dag.create_dagrun(run_type=DagRunType.MANUAL, state=State.RUNNING, **post_body)

Review comment:
       Cause `state` was a required parameter in that function. Will use `QUEUED`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r702108848



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       @msumit By resolving the `task.params` to a plain dict somewhere in this function https://github.com/apache/airflow/blob/2aa443f8d5f0fe9b4b37b9029e9bac5176c563ce/airflow/models/taskinstance.py#L1392-L1416




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r702289401



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       @potiuk :+1: Cool, that'll be useful for other reasons. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r702090552



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       @ashb I would be more than happy to not touch these operators, but unfortunately, we've to. Couldn't see if there is any other workaround possible here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r707015599



##########
File path: airflow/example_dags/example_complex_params.py
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the complex params."""
+
+from airflow import DAG
+from airflow.models.param import Param
+from airflow.operators.bash import BashOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+    "example_complex_params",
+    default_args={
+        "owner": "airflow",
+    },
+    params={
+        'int_param': Param(10, type="integer", minimum=0, maximum=20),  # non default int param
+        'str_param': Param(type="string", minLength=2, maxLength=4),  # a mandatory str param
+        'old_param': 'old_way_of_passing',
+        'simple_param': Param('im_just_like_old_param'),  # i.e. no type checking
+        'email_param': Param(
+            'example@example.com', type='string', pattern='\\S+@\\S+\\.\\S+$', minLength=6, maxLength=255
+        ),
+    },
+    schedule_interval=None,
+    start_date=days_ago(1),
+    tags=['example'],
+) as dag:
+    all_params = BashOperator(
+        task_id='all_param',
+        bash_command="echo {{ params.int_param }} {{ params.str_param }} {{ params.old_param }} "
+        "{{ params.simple_param }} {{ params.email_param }} {{ params.task_param }}",

Review comment:
       getting this error `jinja2.exceptions.TemplateSyntaxError: unexpected char '!' at 24`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: WIP: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r682534162



##########
File path: airflow/models/dag.py
##########
@@ -2312,6 +2327,19 @@ def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: Ed
         """
         self.edge_info.setdefault(upstream_task_id, {})[downstream_task_id] = info
 
+    def validate_schedule_and_params(self):
+        for k, v in self.params.items():
+            if isinstance(v, Param):
+                if (
+                    v.default is None
+                    and self.schedule_interval is not None
+                    and ('type' not in v.schema or "null" not in v.schema['type'])
+                ):

Review comment:
       `type` can be an array, hence we are checking `in`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r701800858



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       We can easily add "additional_requirements' airflow `2.2.0+` for those two providers, to not complicate things. That will handle 2.2 with "hard" limit and it constraints will take care about the pre-2.2 default installation. We can also add a check in those new provider's code to raise warning/exception if they are used in 2.1 environment. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r701110458



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       This use of params was never a ParamsDict, and shouldn't be. This is only meant to be the params explicitly passed to this operator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r706137492



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -878,6 +920,34 @@ def from_dict(cls, serialized_obj: dict) -> 'SerializedDAG':
             raise ValueError(f"Unsure how to deserialize version {ver!r}")
         return cls.deserialize_dag(serialized_obj['dag'])
 
+    @classmethod
+    def _serialize_dag_params(cls, dag_params: ParamsDict):
+        """Serialize Params dict for a DAG"""
+        serialized_params = {}
+        for k, v in dag_params.items():
+            # To support backward compatability with some operators, we would let str params
+            # serialized as well, after converting them into a shallow Param object.
+            if v.__class__.__name__ == 'str':

Review comment:
       ```suggestion
               if isinstance(v, str):
   ```
   
   I think you mean Kaxi




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r701132623



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       Yup agree, but we can't put checks in 10 diff places for just these 2 operators. For example, dag.py's `validate_schedule_and_params` method would complain about `self.params` not being a ParamsDict.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r701805091



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       We can do that, but I think in this particular case not having to deal with this at all in the Operator is a better solution.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: WIP: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r682325786



##########
File path: airflow/cli/commands/task_command.py
##########
@@ -404,7 +404,14 @@ def task_test(args, dag=None):
     # Add CLI provided task_params to task.params
     if args.task_params:
         passed_in_params = json.loads(args.task_params)
+        for k, v in task.params.items():
+            if k in passed_in_params:
+                v.default = passed_in_params.pop(k)
+                v()  # to raise if there are any validation issues

Review comment:
       We are converting any kind of param into `Param` object only, of course with no attached validations, so it'll work. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: WIP: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r682534162



##########
File path: airflow/models/dag.py
##########
@@ -2312,6 +2327,19 @@ def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: Ed
         """
         self.edge_info.setdefault(upstream_task_id, {})[downstream_task_id] = info
 
+    def validate_schedule_and_params(self):
+        for k, v in self.params.items():
+            if isinstance(v, Param):
+                if (
+                    v.default is None
+                    and self.schedule_interval is not None
+                    and ('type' not in v.schema or "null" not in v.schema['type'])
+                ):

Review comment:
       `type` can be an array, hence we are checking `in`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: WIP: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r681618325



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -251,10 +252,12 @@ def post_dag_run(dag_id, session):
         .first()
     )
     if not dagrun_instance:
-        dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL, **post_body)
-        session.add(dag_run)
-        session.commit()
-        return dagrun_schema.dump(dag_run)
+        try:
+            dag = current_app.dag_bag.get_dag(dag_id)
+            dag_run = dag.create_dagrun(run_type=DagRunType.MANUAL, state=State.RUNNING, **post_body)

Review comment:
       Why did we change this call to add a state here?
   
   As of a recent-ish change only the Scheduler should ever put a DagRun in to RUNNING state, everyting else should create/put it in the QUEUED state.

##########
File path: airflow/cli/commands/task_command.py
##########
@@ -404,7 +404,14 @@ def task_test(args, dag=None):
     # Add CLI provided task_params to task.params
     if args.task_params:
         passed_in_params = json.loads(args.task_params)
+        for k, v in task.params.items():
+            if k in passed_in_params:
+                v.default = passed_in_params.pop(k)
+                v()  # to raise if there are any validation issues

Review comment:
       This looks like it would fail for basic/non-validating param types

##########
File path: airflow/example_dags/example_complex_params.py
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the complex params."""
+
+from airflow import DAG
+from airflow.models.param import Param
+from airflow.operators.bash import BashOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+    "example_complex_params",
+    default_args={
+        "owner": "airflow",
+    },
+    params={
+        'int_param': Param(10, type="integer", minimum=0, maximum=20),  # non default int param
+        'str_param': Param(type="string", minLength=2, maxLength=4),  # a mandatory str param
+        'old_param': 'old_way_of_passing',
+        'simple_param': Param('im_just_like_old_param'),  # i.e. no type checking
+        'email_param': Param(
+            'example@example.com', type='string', pattern='\\S+@\\S+\\.\\S+$', minLength=6, maxLength=255

Review comment:
       Assuming this is a JSON Schema/
   
   ```suggestion
               'example@example.com', type='string', format='idn-email', minLength=5, maxLength=255
   ```

##########
File path: airflow/models/dag.py
##########
@@ -2312,6 +2327,19 @@ def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: Ed
         """
         self.edge_info.setdefault(upstream_task_id, {})[downstream_task_id] = info
 
+    def validate_schedule_and_params(self):

Review comment:
       ```suggestion
       def validate_schedule_and_params(self):
           if self.schedule_interval is None:
               return
   ```
   
   and then we can eliminate one of the conditions below

##########
File path: airflow/models/dag.py
##########
@@ -2312,6 +2327,19 @@ def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: Ed
         """
         self.edge_info.setdefault(upstream_task_id, {})[downstream_task_id] = info
 
+    def validate_schedule_and_params(self):
+        for k, v in self.params.items():
+            if isinstance(v, Param):
+                if (
+                    v.default is None
+                    and self.schedule_interval is not None
+                    and ('type' not in v.schema or "null" not in v.schema['type'])
+                ):

Review comment:
       `"null" not in v.schema['type']` looks wrong -- shouldn't it be
   
   ```suggestion
                   if (
                       v.default is None
                       and self.schedule_interval is not None
                       and ('type' not in v.schema or v.schema['type'] != "null")
                   ):
   ```
   
   

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any
+
+from jsonschema import Draft4Validator, FormatChecker
+from jsonschema.exceptions import ValidationError
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+    default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                validator = Draft4Validator(self.schema, format_checker=FormatChecker())
+                validator.validate(self.default)

Review comment:
       We should check the schema instead, using `Draft7Validator.check_schema` - https://python-jsonschema.readthedocs.io/en/latest/validate/#jsonschema.IValidator.check_schema

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -789,6 +825,27 @@ def from_dict(cls, serialized_obj: dict) -> 'SerializedDAG':
             raise ValueError(f"Unsure how to deserialize version {ver!r}")
         return cls.deserialize_dag(serialized_obj['dag'])
 
+    @classmethod
+    def _serialize_dag_params(cls, dag_params: Dict[str, Param]):
+        """ """
+        serialized_params = {}
+        for k, v in dag_params.items():
+            serialized_params[k] = v.__dict__
+            serialized_params[k].update({'__type': f'{v.__class__.__module__}.{v.__class__.__name__}'})
+
+        return serialized_params
+
+    @classmethod
+    def _deserialize_dag_params(cls, encoded_dag_params: Dict) -> Dict[str, Param]:
+        """ """
+        op_params = {}
+        for k, v in encoded_dag_params.items():
+            param_class = import_string(v['__type'])

Review comment:
       This is potentially risky (to import any string, and then instantiate a object of that class) -- everywhere else in Airflow we require classes that we deserilze to be registered in some way to prevent against what I've been calling "object inflation attacks". (The canonical example is from and old Ruby on Rails bug https://codeclimate.com/blog/rails-remote-code-execution-vulnerability-explained/ )
   
   Right now this input is just about trusted, but I've been operating on the principle that it isn't and shouldn't be, as either we'll use this elsewhere in Airflow, or open it up to API submission.
   
   Not sure 100% what the right answer is here, but right now this is against what we've done elsewhere in Airflow.
   
   Options I can think of:
   
   1. Don't allow this to be pluggable -- so this is _only_ allowed to be an `airflow.models.param.Param`
   2. Have some kind of registration like we have with OperatorExtraLinks.
   
   
   

##########
File path: airflow/models/dag.py
##########
@@ -2312,6 +2327,19 @@ def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: Ed
         """
         self.edge_info.setdefault(upstream_task_id, {})[downstream_task_id] = info
 
+    def validate_schedule_and_params(self):

Review comment:
       @uranusjr What is the best way of checking this given the introduction of timetables?
   
   Should this be this?
   
   ```suggestion
       def validate_schedule_and_params(self):
           if isinstance(self.timetable, NullTimetable):
               return
   ```k

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -789,6 +825,27 @@ def from_dict(cls, serialized_obj: dict) -> 'SerializedDAG':
             raise ValueError(f"Unsure how to deserialize version {ver!r}")
         return cls.deserialize_dag(serialized_obj['dag'])
 
+    @classmethod
+    def _serialize_dag_params(cls, dag_params: Dict[str, Param]):
+        """ """
+        serialized_params = {}
+        for k, v in dag_params.items():
+            serialized_params[k] = v.__dict__
+            serialized_params[k].update({'__type': f'{v.__class__.__module__}.{v.__class__.__name__}'})

Review comment:
       Additionally: everywhere ls `__type` is one of an enum, this should be `__class` or similar.

##########
File path: airflow/cli/commands/dag_command.py
##########
@@ -104,21 +104,24 @@ def dag_backfill(args, dag=None):
                 dag_run_state=State.NONE,
             )
 
-        dag.run(
-            start_date=args.start_date,
-            end_date=args.end_date,
-            mark_success=args.mark_success,
-            local=args.local,
-            donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
-            ignore_first_depends_on_past=args.ignore_first_depends_on_past,
-            ignore_task_deps=args.ignore_dependencies,
-            pool=args.pool,
-            delay_on_limit_secs=args.delay_on_limit,
-            verbose=args.verbose,
-            conf=run_conf,
-            rerun_failed_tasks=args.rerun_failed_tasks,
-            run_backwards=args.run_backwards,
-        )
+        try:
+            dag.run(
+                start_date=args.start_date,
+                end_date=args.end_date,
+                mark_success=args.mark_success,
+                local=args.local,
+                donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
+                ignore_first_depends_on_past=args.ignore_first_depends_on_past,
+                ignore_task_deps=args.ignore_dependencies,
+                pool=args.pool,
+                delay_on_limit_secs=args.delay_on_limit,
+                verbose=args.verbose,
+                conf=run_conf,
+                rerun_failed_tasks=args.rerun_failed_tasks,
+                run_backwards=args.run_backwards,
+            )
+        except ValueError as vr:
+            logging.error(str(vr))

Review comment:
       ?

##########
File path: airflow/example_dags/example_complex_params.py
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the complex params."""
+
+from airflow import DAG
+from airflow.models.param import Param
+from airflow.operators.bash import BashOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+    "example_complex_params",
+    default_args={
+        "owner": "airflow",
+    },

Review comment:
       Not needed/is already default
   
   ```suggestion
   ```

##########
File path: airflow/models/dag.py
##########
@@ -2020,6 +2025,15 @@ def create_dagrun(
                 "Creating DagRun needs either `run_id` or both `run_type` and `execution_date`"
             )
 
+        try:
+            for k, v in copy.deepcopy(self.params).items():
+                if isinstance(v, Param):
+                    if conf and k in conf:
+                        v.default = conf[k]
+                    v()
+        except ValueError as ve:
+            raise ValueError(f"Invalid input for param '{k}': {ve}")

Review comment:
       ```suggestion
               raise ValueError(f"Invalid input for param '{k}': {ve}") from None
   ```
   
   Otherwise we'll see two stack traces here which is likely confusing and doesn't help our users.

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any
+
+from jsonschema import Draft4Validator, FormatChecker
+from jsonschema.exceptions import ValidationError
+
+
+class Param:

Review comment:
       Hmmm, having airflow.models.params.Param and airflow.models.dagparam.DagParam is confusing -- do we still need both?
   
   Probably, as they do different things, but the naming is going to lead to confision 🤔 

##########
File path: airflow/example_dags/example_complex_params.py
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the complex params."""
+
+from airflow import DAG
+from airflow.models.param import Param
+from airflow.operators.bash import BashOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+    "example_complex_params",
+    default_args={
+        "owner": "airflow",
+    },
+    params={
+        'int_param': Param(10, type="integer", minimum=0, maximum=20),  # non default int param
+        'str_param': Param(type="string", minLength=2, maxLength=4),  # a mandatory str param
+        'old_param': 'old_way_of_passing',
+        'simple_param': Param('im_just_like_old_param'),  # i.e. no type checking
+        'email_param': Param(
+            'example@example.com', type='string', pattern='\\S+@\\S+\\.\\S+$', minLength=6, maxLength=255
+        ),
+    },
+    schedule_interval=None,
+    start_date=days_ago(1),
+    tags=['example'],
+) as dag:
+    all_params = BashOperator(
+        task_id='all_param',
+        bash_command="echo {{ params.int_param }} {{ params.str_param }} {{ params.old_param }} "
+        "{{ params.simple_param }} {{ params.email_param }} {{ params.task_param }}",

Review comment:
       This is prone to escaping bugs:
   
   ```suggestion
           bash_command="echo {{ params.int_param!r }} {{ params.str_param!r }} {{ params.old_param!r }} "
           "{{ params.simple_param!r }} {{ params.email_param!r }} {{ params.task_param!r }}",
   ```

##########
File path: airflow/models/dag.py
##########
@@ -2020,6 +2025,15 @@ def create_dagrun(
                 "Creating DagRun needs either `run_id` or both `run_type` and `execution_date`"
             )
 
+        try:
+            for k, v in copy.deepcopy(self.params).items():
+                if isinstance(v, Param):
+                    if conf and k in conf:
+                        v.default = conf[k]
+                    v()

Review comment:
       We should delay the deepcopy until as late as possible
   ```suggestion
               for k, v in self.params.items():
                   if isinstance(v, Param):
                       if conf and k in conf:
                           v = copy.deepcopy(v)
                           v.default = conf[k]
                       v()
   ```

##########
File path: airflow/models/taskinstance.py
##########
@@ -1749,7 +1761,13 @@ def overwrite_params_with_dag_run_conf(self, params, dag_run):
         """Overwrite Task Params with DagRun.conf"""
         if dag_run and dag_run.conf:
             self.log.debug("Updating task params (%s) with DagRun.conf (%s)", params, dag_run.conf)
-            params.update(dag_run.conf)
+            for k, v in dag_run.conf.items():
+                if k in params:
+                    params[k].default = v

Review comment:
       Same comment here about setting default (we shouldn't)

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any
+
+from jsonschema import Draft4Validator, FormatChecker
+from jsonschema.exceptions import ValidationError
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+    default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                validator = Draft4Validator(self.schema, format_checker=FormatChecker())

Review comment:
       Draft4 is old, we should be using 2019-09/the latest Draft that jsonschema modules supports (Draft 7?)

##########
File path: airflow/models/taskinstance.py
##########
@@ -1522,7 +1523,7 @@ def is_eligible_to_retry(self):
         return self.task.retries and self.try_number <= self.max_tries
 
     @provide_session
-    def get_template_context(self, session=None) -> Context:
+    def get_template_context(self, ignore_param_exceptions=True, session=None) -> Context:

Review comment:
       Why is the default to ignore exceptions?

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any
+
+from jsonschema import Draft4Validator, FormatChecker
+from jsonschema.exceptions import ValidationError
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+    default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                validator = Draft4Validator(self.schema, format_checker=FormatChecker())
+                validator.validate(self.default)
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def __call__(self, suppress_exception: bool = False) -> Any:

Review comment:
       Using self.default in here for _the actual value_ seem odd -- I would have expected this instead to be
   
   ```suggestion
       def __call__(self, value: Optional[T], suppress_exception: bool = False) -> T:
   ```

##########
File path: tests/models/test_dagparam.py
##########
@@ -76,7 +76,7 @@ def return_num(num):
 
             xcom_arg = return_num(value)
 
-        assert dag.params['value'] == self.VALUE
+        assert dag.params['value']() == self.VALUE

Review comment:
       Hmmmm, I wonder if this makes it a breaking change.

##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -85,7 +85,6 @@ def __init__(
         bucket_name: str,
         object_name: str,
         fields: List[str],
-        params: Dict[str, Any],

Review comment:
       I _suspect_ that this params was meant to be different to the params from the base operator and this is a name collision (that until now hasn't caused any problems)

##########
File path: airflow/models/dag.py
##########
@@ -2020,6 +2025,15 @@ def create_dagrun(
                 "Creating DagRun needs either `run_id` or both `run_type` and `execution_date`"
             )
 
+        try:
+            for k, v in copy.deepcopy(self.params).items():
+                if isinstance(v, Param):
+                    if conf and k in conf:
+                        v.default = conf[k]
+                    v()

Review comment:
       Actually -- we shouldn't ever change v.default -- this seems like an odd pattern to set overwrite the default with the provided value.

##########
File path: airflow/models/dag.py
##########
@@ -1976,7 +1981,7 @@ def create_dagrun(
         run_id: Optional[str] = None,
         start_date: Optional[datetime] = None,
         external_trigger: Optional[bool] = False,
-        conf: Optional[dict] = None,
+        conf: Optional[dict] = {},

Review comment:
       This is only a problem if/when `dagrun.conf` is ever edited in place (by adding keys etc) -- and given we can't tell where `dagrun.conf` might be used elsewhere in the codebase it would be safer to change this, because while it might not cause problems _right now_ it might bite us later.

##########
File path: airflow/www/views.py
##########
@@ -1560,19 +1562,24 @@ def trigger(self, session=None):
                     'airflow/trigger.html', dag_id=dag_id, origin=origin, conf=request_conf, form=form
                 )
 
-        dag = current_app.dag_bag.get_dag(dag_id)
-
         if unpause and dag.is_paused:
             models.DagModel.get_dagmodel(dag_id).set_is_paused(is_paused=False)
 
-        dag.create_dagrun(
-            run_type=DagRunType.MANUAL,
-            execution_date=execution_date,
-            state=State.QUEUED,
-            conf=run_conf,
-            external_trigger=True,
-            dag_hash=current_app.dag_bag.dags_hash.get(dag_id),
-        )
+        try:
+            dag.create_dagrun(
+                run_type=DagRunType.MANUAL,
+                execution_date=execution_date,
+                state=State.RUNNING,

Review comment:
       ```suggestion
                   state=State.QUEUED,
   ```

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -789,6 +825,27 @@ def from_dict(cls, serialized_obj: dict) -> 'SerializedDAG':
             raise ValueError(f"Unsure how to deserialize version {ver!r}")
         return cls.deserialize_dag(serialized_obj['dag'])
 
+    @classmethod
+    def _serialize_dag_params(cls, dag_params: Dict[str, Param]):
+        """ """
+        serialized_params = {}
+        for k, v in dag_params.items():
+            serialized_params[k] = v.__dict__
+            serialized_params[k].update({'__type': f'{v.__class__.__module__}.{v.__class__.__name__}'})

Review comment:
       Isn't this the same as `v.dump()`?

##########
File path: airflow/models/taskinstance.py
##########
@@ -1590,6 +1591,17 @@ def get_template_context(self, session=None) -> Context:
         if conf.getboolean('core', 'dag_run_conf_overrides_params'):
             self.overwrite_params_with_dag_run_conf(params=params, dag_run=dag_run)
 
+        # Now update params dict with simple values
+        for k, v in params.items():
+            if isinstance(v, Param):
+                try:
+                    params[k] = v()
+                except ValueError:
+                    if ignore_param_exceptions:
+                        params[k] = None
+                    else:
+                        raise

Review comment:
       ```suggestion
                   params[k] = v(supress_exceptions=ignore_param_exceptions)
   ```

##########
File path: airflow/models/dag.py
##########
@@ -2312,6 +2327,19 @@ def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: Ed
         """
         self.edge_info.setdefault(upstream_task_id, {})[downstream_task_id] = info
 
+    def validate_schedule_and_params(self):
+        for k, v in self.params.items():
+            if isinstance(v, Param):
+                if (
+                    v.default is None
+                    and self.schedule_interval is not None
+                    and ('type' not in v.schema or "null" not in v.schema['type'])
+                ):

Review comment:
       And by moving the schedule_interval check up we can simplify this to one line
   ```suggestion
                   if v.default is None and ('type' not in v.schema or v.schema['type'] != 'null'):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r699237132



##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any
+
+from jsonschema import Draft4Validator, FormatChecker
+from jsonschema.exceptions import ValidationError
+
+
+class Param:

Review comment:
       As discussed, moved DagParam to Param as well and added comments. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r701697987



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       Just tested, task params do contain the dag level params as well, so it would mix up simple params dict with a ParamsDict object and things would break. On the other hand, I'm still not clear why using base params instead of class-level params is not a good solution? Agree that the intended use would have been different, but as they messed up the naming, this solution seems to be a decent fix. About changing the behavior of operators, it won't, unless someone tries to do something fancy. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r705732394



##########
File path: airflow/cli/commands/dag_command.py
##########
@@ -106,21 +106,25 @@ def dag_backfill(args, dag=None):
                 dag_run_state=State.NONE,
             )
 
-        dag.run(
-            start_date=args.start_date,
-            end_date=args.end_date,
-            mark_success=args.mark_success,
-            local=args.local,
-            donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
-            ignore_first_depends_on_past=args.ignore_first_depends_on_past,
-            ignore_task_deps=args.ignore_dependencies,
-            pool=args.pool,
-            delay_on_limit_secs=args.delay_on_limit,
-            verbose=args.verbose,
-            conf=run_conf,
-            rerun_failed_tasks=args.rerun_failed_tasks,
-            run_backwards=args.run_backwards,
-        )
+        try:
+            dag.run(
+                start_date=args.start_date,
+                end_date=args.end_date,
+                mark_success=args.mark_success,
+                local=args.local,
+                donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
+                ignore_first_depends_on_past=args.ignore_first_depends_on_past,
+                ignore_task_deps=args.ignore_dependencies,
+                pool=args.pool,
+                delay_on_limit_secs=args.delay_on_limit,
+                verbose=args.verbose,
+                conf=run_conf,
+                rerun_failed_tasks=args.rerun_failed_tasks,
+                run_backwards=args.run_backwards,
+            )
+        except ValueError as vr:
+            logging.error(str(vr))
+            sys.exit(1)

Review comment:
       Should we print rather than logging like all the other CLI functions
   
   ```suggestion
           except ValueError as vr:
               print(str(vr))
               sys.exit(1)
   ```

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+        default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                jsonschema.validate(self.default, self.schema, format_checker=FormatChecker())
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def resolve(self, value: Optional[Any] = None, suppress_exception: bool = False) -> Any:
+        """
+        Runs the validations and returns the Param's final value.
+        May raise ValueError on failed validations.
+
+        :param value: The value to be updated for the Param
+        :type: Optional[Any]

Review comment:
       ```suggestion
           :type value: Optional[Any]
   ```

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -710,6 +721,34 @@ def _serialize_operator_extra_links(cls, operator_extra_links: Iterable[BaseOper
 
         return serialize_operator_extra_links
 
+    @classmethod
+    def _deserialize_operator_params(cls, encoded_op_params: Dict) -> Dict[str, Param]:
+        """Deserialize Params dict of a operator"""
+        op_params = {}
+        for k, v in encoded_op_params.items():
+            param_class = import_string(v['__class'])
+            del v['__class']
+            op_params[k] = param_class(**v)
+
+        return ParamsDict(op_params)
+
+    @classmethod
+    def _serialize_operator_params(cls, op_params: ParamsDict):
+        """Serialize Params dict of a operator"""
+        serialized_params = {}
+        for k, v in op_params.items():
+            # To support backward compatability with some operators, we would let str params
+            # serialized as well, after converting them into a shallow Param object.
+            if v.__class__.__name__ == 'str':

Review comment:
       nit: Can we do the following instead?
   
   ```suggestion
               if isintance(v, 'str'):
   ```

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+        default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                jsonschema.validate(self.default, self.schema, format_checker=FormatChecker())
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def resolve(self, value: Optional[Any] = None, suppress_exception: bool = False) -> Any:
+        """
+        Runs the validations and returns the Param's final value.
+        May raise ValueError on failed validations.
+
+        :param value: The value to be updated for the Param
+        :type: Optional[Any]
+        :param suppress_exception: To raise an exception or not when the validations fails.
+            If true and validations fails, the return value would be None.
+        :type suppress_exception: bool
+        """
+        try:
+            final_val = value or self.default
+            jsonschema.validate(final_val, self.schema, format_checker=FormatChecker())
+            self.default = final_val
+        except ValidationError as err:
+            if suppress_exception:
+                return None
+            raise ValueError(err) from None
+        return final_val
+
+    def dump(self) -> dict:
+        """Dump the Param as a dictionary"""
+        out_dict = {'__class': f'{self.__module__}.{self.__class__.__name__}'}
+        out_dict.update(self.__dict__)
+        return out_dict
+
+
+class ParamsDict(dict):
+    """
+    Class to hold all params for dags or tasks. All the keys are strictly string and values
+    are converted into Param's object if they are not already. This class is to replace param's
+    dictionary implicitly and ideally not needed to be used directly.
+    """
+
+    def __init__(self, dict_obj: Optional[Dict] = None, suppress_exception: bool = False):
+        """
+        Init override for ParamsDict
+        :param dict_obj: A dict or dict like object to init ParamsDict
+        :type dict_obj: Optional[dict]
+        :param suppress_exception: Flag to suppress value exceptions while initializing the ParamsDict
+        :type suppress_exception: bool
+        """
+        params_dict = {}
+        dict_obj = dict_obj or {}
+        for k, v in dict_obj.items():
+            if not isinstance(v, Param):
+                params_dict[k] = Param(v)
+            else:
+                params_dict[k] = v
+        dict.__init__(self, params_dict)
+        self.suppress_exception = suppress_exception
+
+    def __setitem__(self, key: str, value: Any) -> None:
+        """
+        Override for dictionary's ``setitem`` method. This method make sure that all values are of
+        Param's type only.
+
+        :param key: A key which needs to be inserted or updated in the dict
+        :type key: str
+        :param value: A value which needs to be set against the key. It could be of any
+            type but will be converted and stored as a Param object eventually.
+        :type value: Any
+        """
+        try:
+            param = dict.__getitem__(self, key)  # check if the param is in dict already
+            # if the new value is of Param type, then just use it otherwise call resolve on it
+            if isinstance(value, Param):
+                param = value
+            else:
+                param.resolve(value=value, suppress_exception=self.suppress_exception)
+        except KeyError:
+            # if the key isn't there already and if the value is of Param type,
+            # then use it otherwise create a new Param object
+            param = value if isinstance(value, Param) else Param(value)
+
+        dict.__setitem__(self, key, param)
+
+    def __getitem__(self, key: str) -> Any:
+        """
+        Override for dictionary's ``getitem`` method. After fetching the key, it would call the
+        resolve method as well on the Param object.
+
+        :param key: The key to fetch
+        :type key: str
+        """
+        param = dict.__getitem__(self, key)
+        return param.resolve(suppress_exception=self.suppress_exception)
+
+    def dump(self) -> dict:
+        """Dumps the ParamsDict object as a dictionary, while suppressing exceptions"""
+        return {k: v.resolve(suppress_exception=True) for k, v in self.items()}
+
+    def update(self, other_dict: dict) -> None:
+        """
+        Override for dictionary's update method.
+        :param other_dict: A dict type object which needs to be merged in the ParamsDict object
+        :type other_dict: dict
+        """
+        try:
+            for k, v in other_dict.items():
+                self.__setitem__(k, v)
+        except ValueError as ve:
+            raise ValueError(f'Invalid input for param {k}: {ve}') from None
+
+    def validate(self) -> None:
+        """Validates & returns all the Params object stored in the dictionary"""
+        resolved_dict = {}
+        try:
+            for k, v in dict.items(self):
+                resolved_dict[k] = v.resolve(suppress_exception=self.suppress_exception)
+        except ValueError as ve:
+            raise ValueError(f'Invalid input for param {k}: {ve}') from None
+
+        return resolved_dict
+
+
+class DagParam:
+    """
+    Class that represents a DAG run parameter & binds a simple Param object to a name within a DAG instance,
+    so that it can be resolved during the run time via {{ context }} dictionary. The ideal use case of this
+    class is to implicitly convert args passed to a method which is being decorated by @dag keyword.
+
+    It can be used to parameterize your dags. You can overwrite its value by setting it on conf
+    when you trigger your DagRun.
+
+    This can also be used in templates by accessing {{context.params}} dictionary.

Review comment:
       ```suggestion
       This can also be used in templates by accessing ``{{context.params}}`` dictionary.
   ```

##########
File path: airflow/cli/commands/task_command.py
##########
@@ -437,6 +437,9 @@ def task_test(args, dag=None):
     if args.task_params:
         passed_in_params = json.loads(args.task_params)
         task.params.update(passed_in_params)
+
+    task.params.validate()

Review comment:
       Should we validate always or can we do as below:
   
   ```suggestion
       if task.params:
           task.params.validate()
   ```

##########
File path: airflow/models/dag.py
##########
@@ -2436,6 +2446,21 @@ def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: Ed
         """
         self.edge_info.setdefault(upstream_task_id, {})[downstream_task_id] = info
 
+    def validate_schedule_and_params(self):
+        """
+        Validates & raise exception if there are any Params in the DAG which neither have a default value nor
+        have the null in schema['type'] list, but the DAG have a schedule_interval which is not None.
+        """
+        if not self.timetable.can_run:
+            return
+
+        for k, v in self.params.items():
+            # As type can be an array, we would check if `null` is a allowed type or not
+            if v.default is None and ('type' not in v.schema or "null" not in v.schema['type']):
+                raise AirflowException(
+                    'DAG Schedule must be None, if there are any required params without default values'
+                )

Review comment:
       ```suggestion
               if v.default is None and ("type" not in v.schema or "null" not in v.schema["type"]):
                   raise AirflowException(
                       "DAG Schedule must be None, if there are any required params without default values"
                   )
   ```
   
   nit -- for consistency in just these 3 lines

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds

Review comment:
       ```suggestion
       :param default: The value this Param object holds
   ```

##########
File path: airflow/example_dags/example_complex_params.py
##########
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the complex params."""
+
+from airflow import DAG
+from airflow.models.param import Param
+from airflow.operators.bash import BashOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+    "example_complex_params",
+    default_args={
+        "owner": "airflow",
+    },
+    params={
+        'int_param': Param(10, type="integer", minimum=0, maximum=20),  # non default int param
+        'str_param': Param(type="string", minLength=2, maxLength=4),  # a mandatory str param
+        'old_param': 'old_way_of_passing',
+        'simple_param': Param('im_just_like_old_param'),  # i.e. no type checking
+        'email_param': Param(
+            'example@example.com', type='string', pattern='\\S+@\\S+\\.\\S+$', minLength=6, maxLength=255
+        ),
+    },
+    schedule_interval=None,
+    start_date=days_ago(1),
+    tags=['example'],
+) as dag:
+    all_params = BashOperator(
+        task_id='all_param',
+        bash_command="echo {{ params.int_param }} {{ params.str_param }} {{ params.old_param }} "
+        "{{ params.simple_param }} {{ params.email_param }} {{ params.task_param }}",

Review comment:
       this is still un-solved

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -878,6 +920,34 @@ def from_dict(cls, serialized_obj: dict) -> 'SerializedDAG':
             raise ValueError(f"Unsure how to deserialize version {ver!r}")
         return cls.deserialize_dag(serialized_obj['dag'])
 
+    @classmethod
+    def _serialize_dag_params(cls, dag_params: ParamsDict):
+        """Serialize Params dict for a DAG"""
+        serialized_params = {}
+        for k, v in dag_params.items():
+            # To support backward compatability with some operators, we would let str params
+            # serialized as well, after converting them into a shallow Param object.
+            if v.__class__.__name__ == 'str':

Review comment:
       ```suggestion
               if isintance(v, 'str'):
   ```

##########
File path: tests/models/test_dag.py
##########
@@ -1735,6 +1744,16 @@ def test_replace_outdated_access_control_actions(self):
             dag.access_control = outdated_permissions
         assert dag.access_control == updated_permissions
 
+    def test_validate_params_on_trigger_dag(self):
+        dag = models.DAG('dummy-dag', schedule_interval=None, params={'param1': Param(type="string")})
+
+        with pytest.raises(ValueError):

Review comment:
       ```suggestion
           with pytest.raises(ValueError, match="Invalid input for param param1: None is not of type 'string'"):
   ```
   
   Probably, just to make sure it actually shows useful error

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+        default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                jsonschema.validate(self.default, self.schema, format_checker=FormatChecker())
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def resolve(self, value: Optional[Any] = None, suppress_exception: bool = False) -> Any:
+        """
+        Runs the validations and returns the Param's final value.
+        May raise ValueError on failed validations.
+
+        :param value: The value to be updated for the Param
+        :type: Optional[Any]
+        :param suppress_exception: To raise an exception or not when the validations fails.
+            If true and validations fails, the return value would be None.
+        :type suppress_exception: bool
+        """
+        try:
+            final_val = value or self.default
+            jsonschema.validate(final_val, self.schema, format_checker=FormatChecker())
+            self.default = final_val
+        except ValidationError as err:
+            if suppress_exception:
+                return None
+            raise ValueError(err) from None
+        return final_val
+
+    def dump(self) -> dict:
+        """Dump the Param as a dictionary"""
+        out_dict = {'__class': f'{self.__module__}.{self.__class__.__name__}'}
+        out_dict.update(self.__dict__)
+        return out_dict
+
+
+class ParamsDict(dict):
+    """
+    Class to hold all params for dags or tasks. All the keys are strictly string and values
+    are converted into Param's object if they are not already. This class is to replace param's
+    dictionary implicitly and ideally not needed to be used directly.
+    """
+
+    def __init__(self, dict_obj: Optional[Dict] = None, suppress_exception: bool = False):
+        """
+        Init override for ParamsDict
+        :param dict_obj: A dict or dict like object to init ParamsDict
+        :type dict_obj: Optional[dict]
+        :param suppress_exception: Flag to suppress value exceptions while initializing the ParamsDict
+        :type suppress_exception: bool
+        """
+        params_dict = {}
+        dict_obj = dict_obj or {}
+        for k, v in dict_obj.items():
+            if not isinstance(v, Param):
+                params_dict[k] = Param(v)
+            else:
+                params_dict[k] = v
+        dict.__init__(self, params_dict)
+        self.suppress_exception = suppress_exception
+
+    def __setitem__(self, key: str, value: Any) -> None:
+        """
+        Override for dictionary's ``setitem`` method. This method make sure that all values are of
+        Param's type only.
+
+        :param key: A key which needs to be inserted or updated in the dict
+        :type key: str
+        :param value: A value which needs to be set against the key. It could be of any
+            type but will be converted and stored as a Param object eventually.
+        :type value: Any
+        """
+        try:
+            param = dict.__getitem__(self, key)  # check if the param is in dict already
+            # if the new value is of Param type, then just use it otherwise call resolve on it
+            if isinstance(value, Param):
+                param = value
+            else:
+                param.resolve(value=value, suppress_exception=self.suppress_exception)
+        except KeyError:
+            # if the key isn't there already and if the value is of Param type,
+            # then use it otherwise create a new Param object
+            param = value if isinstance(value, Param) else Param(value)
+
+        dict.__setitem__(self, key, param)
+
+    def __getitem__(self, key: str) -> Any:
+        """
+        Override for dictionary's ``getitem`` method. After fetching the key, it would call the
+        resolve method as well on the Param object.
+
+        :param key: The key to fetch
+        :type key: str
+        """
+        param = dict.__getitem__(self, key)
+        return param.resolve(suppress_exception=self.suppress_exception)
+
+    def dump(self) -> dict:
+        """Dumps the ParamsDict object as a dictionary, while suppressing exceptions"""
+        return {k: v.resolve(suppress_exception=True) for k, v in self.items()}
+
+    def update(self, other_dict: dict) -> None:
+        """
+        Override for dictionary's update method.
+        :param other_dict: A dict type object which needs to be merged in the ParamsDict object
+        :type other_dict: dict
+        """
+        try:
+            for k, v in other_dict.items():
+                self.__setitem__(k, v)
+        except ValueError as ve:
+            raise ValueError(f'Invalid input for param {k}: {ve}') from None
+
+    def validate(self) -> None:
+        """Validates & returns all the Params object stored in the dictionary"""
+        resolved_dict = {}
+        try:
+            for k, v in dict.items(self):
+                resolved_dict[k] = v.resolve(suppress_exception=self.suppress_exception)
+        except ValueError as ve:
+            raise ValueError(f'Invalid input for param {k}: {ve}') from None
+
+        return resolved_dict
+
+
+class DagParam:
+    """
+    Class that represents a DAG run parameter & binds a simple Param object to a name within a DAG instance,
+    so that it can be resolved during the run time via {{ context }} dictionary. The ideal use case of this

Review comment:
       ```suggestion
       so that it can be resolved during the run time via ``{{ context }}`` dictionary. The ideal use case of this
   ```

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+        default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                jsonschema.validate(self.default, self.schema, format_checker=FormatChecker())
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def resolve(self, value: Optional[Any] = None, suppress_exception: bool = False) -> Any:
+        """
+        Runs the validations and returns the Param's final value.
+        May raise ValueError on failed validations.
+
+        :param value: The value to be updated for the Param
+        :type: Optional[Any]
+        :param suppress_exception: To raise an exception or not when the validations fails.
+            If true and validations fails, the return value would be None.
+        :type suppress_exception: bool
+        """
+        try:
+            final_val = value or self.default
+            jsonschema.validate(final_val, self.schema, format_checker=FormatChecker())
+            self.default = final_val
+        except ValidationError as err:
+            if suppress_exception:
+                return None
+            raise ValueError(err) from None
+        return final_val
+
+    def dump(self) -> dict:
+        """Dump the Param as a dictionary"""
+        out_dict = {'__class': f'{self.__module__}.{self.__class__.__name__}'}
+        out_dict.update(self.__dict__)
+        return out_dict
+
+
+class ParamsDict(dict):
+    """
+    Class to hold all params for dags or tasks. All the keys are strictly string and values
+    are converted into Param's object if they are not already. This class is to replace param's
+    dictionary implicitly and ideally not needed to be used directly.
+    """
+
+    def __init__(self, dict_obj: Optional[Dict] = None, suppress_exception: bool = False):
+        """
+        Init override for ParamsDict
+        :param dict_obj: A dict or dict like object to init ParamsDict
+        :type dict_obj: Optional[dict]
+        :param suppress_exception: Flag to suppress value exceptions while initializing the ParamsDict
+        :type suppress_exception: bool
+        """
+        params_dict = {}
+        dict_obj = dict_obj or {}
+        for k, v in dict_obj.items():
+            if not isinstance(v, Param):
+                params_dict[k] = Param(v)
+            else:
+                params_dict[k] = v
+        dict.__init__(self, params_dict)
+        self.suppress_exception = suppress_exception
+
+    def __setitem__(self, key: str, value: Any) -> None:
+        """
+        Override for dictionary's ``setitem`` method. This method make sure that all values are of
+        Param's type only.
+
+        :param key: A key which needs to be inserted or updated in the dict
+        :type key: str
+        :param value: A value which needs to be set against the key. It could be of any
+            type but will be converted and stored as a Param object eventually.
+        :type value: Any
+        """
+        try:
+            param = dict.__getitem__(self, key)  # check if the param is in dict already
+            # if the new value is of Param type, then just use it otherwise call resolve on it
+            if isinstance(value, Param):
+                param = value
+            else:
+                param.resolve(value=value, suppress_exception=self.suppress_exception)
+        except KeyError:
+            # if the key isn't there already and if the value is of Param type,
+            # then use it otherwise create a new Param object
+            param = value if isinstance(value, Param) else Param(value)
+
+        dict.__setitem__(self, key, param)
+
+    def __getitem__(self, key: str) -> Any:
+        """
+        Override for dictionary's ``getitem`` method. After fetching the key, it would call the
+        resolve method as well on the Param object.
+
+        :param key: The key to fetch
+        :type key: str
+        """
+        param = dict.__getitem__(self, key)
+        return param.resolve(suppress_exception=self.suppress_exception)
+
+    def dump(self) -> dict:
+        """Dumps the ParamsDict object as a dictionary, while suppressing exceptions"""
+        return {k: v.resolve(suppress_exception=True) for k, v in self.items()}
+
+    def update(self, other_dict: dict) -> None:
+        """
+        Override for dictionary's update method.
+        :param other_dict: A dict type object which needs to be merged in the ParamsDict object
+        :type other_dict: dict
+        """
+        try:
+            for k, v in other_dict.items():
+                self.__setitem__(k, v)
+        except ValueError as ve:
+            raise ValueError(f'Invalid input for param {k}: {ve}') from None
+
+    def validate(self) -> None:
+        """Validates & returns all the Params object stored in the dictionary"""
+        resolved_dict = {}
+        try:
+            for k, v in dict.items(self):
+                resolved_dict[k] = v.resolve(suppress_exception=self.suppress_exception)
+        except ValueError as ve:
+            raise ValueError(f'Invalid input for param {k}: {ve}') from None
+
+        return resolved_dict
+
+
+class DagParam:
+    """
+    Class that represents a DAG run parameter & binds a simple Param object to a name within a DAG instance,
+    so that it can be resolved during the run time via {{ context }} dictionary. The ideal use case of this
+    class is to implicitly convert args passed to a method which is being decorated by @dag keyword.
+
+    It can be used to parameterize your dags. You can overwrite its value by setting it on conf
+    when you trigger your DagRun.
+
+    This can also be used in templates by accessing {{context.params}} dictionary.
+
+    **Example**:
+
+        with DAG(...) as dag:
+          EmailOperator(subject=dag.param('subject', 'Hi from Airflow!'))
+
+    :param current_dag: Dag being used for parameter.
+    :type current_dag: airflow.models.DAG
+    :param name: key value which is used to set the parameter
+    :type name: str
+    :param default: Default value used if no parameter was set.
+    :type default: Any
+    """
+
+    def __init__(self, current_dag, name: str, default: Optional[Any] = None):
+        if default:
+            current_dag.params[name] = default
+        self._name = name
+        self._default = default
+
+    def resolve(self, context: Dict) -> Any:
+        """Pull DagParam value from DagRun context. This method is run during ``op.execute()``."""
+        default = self._default
+        if not self._default:
+            default = context['params'][self._name] if self._name in context['params'] else None

Review comment:
       ```suggestion
               default = context['params'].get('self._name')
   ```
   
   Simplified representation

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+        default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                jsonschema.validate(self.default, self.schema, format_checker=FormatChecker())
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def resolve(self, value: Optional[Any] = None, suppress_exception: bool = False) -> Any:
+        """
+        Runs the validations and returns the Param's final value.
+        May raise ValueError on failed validations.
+
+        :param value: The value to be updated for the Param
+        :type: Optional[Any]
+        :param suppress_exception: To raise an exception or not when the validations fails.
+            If true and validations fails, the return value would be None.
+        :type suppress_exception: bool
+        """
+        try:
+            final_val = value or self.default
+            jsonschema.validate(final_val, self.schema, format_checker=FormatChecker())
+            self.default = final_val
+        except ValidationError as err:
+            if suppress_exception:
+                return None
+            raise ValueError(err) from None
+        return final_val
+
+    def dump(self) -> dict:
+        """Dump the Param as a dictionary"""
+        out_dict = {'__class': f'{self.__module__}.{self.__class__.__name__}'}
+        out_dict.update(self.__dict__)
+        return out_dict
+
+
+class ParamsDict(dict):
+    """
+    Class to hold all params for dags or tasks. All the keys are strictly string and values
+    are converted into Param's object if they are not already. This class is to replace param's
+    dictionary implicitly and ideally not needed to be used directly.
+    """
+
+    def __init__(self, dict_obj: Optional[Dict] = None, suppress_exception: bool = False):
+        """
+        Init override for ParamsDict
+        :param dict_obj: A dict or dict like object to init ParamsDict
+        :type dict_obj: Optional[dict]
+        :param suppress_exception: Flag to suppress value exceptions while initializing the ParamsDict
+        :type suppress_exception: bool
+        """
+        params_dict = {}
+        dict_obj = dict_obj or {}
+        for k, v in dict_obj.items():
+            if not isinstance(v, Param):
+                params_dict[k] = Param(v)
+            else:
+                params_dict[k] = v
+        dict.__init__(self, params_dict)
+        self.suppress_exception = suppress_exception
+
+    def __setitem__(self, key: str, value: Any) -> None:
+        """
+        Override for dictionary's ``setitem`` method. This method make sure that all values are of
+        Param's type only.
+
+        :param key: A key which needs to be inserted or updated in the dict
+        :type key: str
+        :param value: A value which needs to be set against the key. It could be of any
+            type but will be converted and stored as a Param object eventually.
+        :type value: Any
+        """
+        try:
+            param = dict.__getitem__(self, key)  # check if the param is in dict already
+            # if the new value is of Param type, then just use it otherwise call resolve on it
+            if isinstance(value, Param):
+                param = value
+            else:
+                param.resolve(value=value, suppress_exception=self.suppress_exception)
+        except KeyError:
+            # if the key isn't there already and if the value is of Param type,
+            # then use it otherwise create a new Param object
+            param = value if isinstance(value, Param) else Param(value)
+
+        dict.__setitem__(self, key, param)
+
+    def __getitem__(self, key: str) -> Any:
+        """
+        Override for dictionary's ``getitem`` method. After fetching the key, it would call the
+        resolve method as well on the Param object.
+
+        :param key: The key to fetch
+        :type key: str
+        """
+        param = dict.__getitem__(self, key)
+        return param.resolve(suppress_exception=self.suppress_exception)
+
+    def dump(self) -> dict:
+        """Dumps the ParamsDict object as a dictionary, while suppressing exceptions"""
+        return {k: v.resolve(suppress_exception=True) for k, v in self.items()}
+
+    def update(self, other_dict: dict) -> None:
+        """
+        Override for dictionary's update method.
+        :param other_dict: A dict type object which needs to be merged in the ParamsDict object
+        :type other_dict: dict
+        """
+        try:
+            for k, v in other_dict.items():
+                self.__setitem__(k, v)
+        except ValueError as ve:
+            raise ValueError(f'Invalid input for param {k}: {ve}') from None
+
+    def validate(self) -> None:
+        """Validates & returns all the Params object stored in the dictionary"""
+        resolved_dict = {}
+        try:
+            for k, v in dict.items(self):
+                resolved_dict[k] = v.resolve(suppress_exception=self.suppress_exception)
+        except ValueError as ve:
+            raise ValueError(f'Invalid input for param {k}: {ve}') from None
+
+        return resolved_dict
+
+
+class DagParam:
+    """
+    Class that represents a DAG run parameter & binds a simple Param object to a name within a DAG instance,
+    so that it can be resolved during the run time via {{ context }} dictionary. The ideal use case of this
+    class is to implicitly convert args passed to a method which is being decorated by @dag keyword.

Review comment:
       ```suggestion
       class is to implicitly convert args passed to a method which is being decorated by ``@dag`` keyword.
   ```

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+        default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                jsonschema.validate(self.default, self.schema, format_checker=FormatChecker())
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def resolve(self, value: Optional[Any] = None, suppress_exception: bool = False) -> Any:
+        """
+        Runs the validations and returns the Param's final value.
+        May raise ValueError on failed validations.
+
+        :param value: The value to be updated for the Param
+        :type: Optional[Any]
+        :param suppress_exception: To raise an exception or not when the validations fails.
+            If true and validations fails, the return value would be None.
+        :type suppress_exception: bool
+        """
+        try:
+            final_val = value or self.default
+            jsonschema.validate(final_val, self.schema, format_checker=FormatChecker())
+            self.default = final_val
+        except ValidationError as err:
+            if suppress_exception:
+                return None
+            raise ValueError(err) from None
+        return final_val
+
+    def dump(self) -> dict:
+        """Dump the Param as a dictionary"""
+        out_dict = {'__class': f'{self.__module__}.{self.__class__.__name__}'}
+        out_dict.update(self.__dict__)
+        return out_dict
+
+
+class ParamsDict(dict):
+    """
+    Class to hold all params for dags or tasks. All the keys are strictly string and values
+    are converted into Param's object if they are not already. This class is to replace param's
+    dictionary implicitly and ideally not needed to be used directly.
+    """
+
+    def __init__(self, dict_obj: Optional[Dict] = None, suppress_exception: bool = False):
+        """
+        Init override for ParamsDict
+        :param dict_obj: A dict or dict like object to init ParamsDict
+        :type dict_obj: Optional[dict]
+        :param suppress_exception: Flag to suppress value exceptions while initializing the ParamsDict
+        :type suppress_exception: bool
+        """
+        params_dict = {}
+        dict_obj = dict_obj or {}
+        for k, v in dict_obj.items():
+            if not isinstance(v, Param):
+                params_dict[k] = Param(v)
+            else:
+                params_dict[k] = v
+        dict.__init__(self, params_dict)
+        self.suppress_exception = suppress_exception
+
+    def __setitem__(self, key: str, value: Any) -> None:
+        """
+        Override for dictionary's ``setitem`` method. This method make sure that all values are of
+        Param's type only.
+
+        :param key: A key which needs to be inserted or updated in the dict
+        :type key: str
+        :param value: A value which needs to be set against the key. It could be of any
+            type but will be converted and stored as a Param object eventually.
+        :type value: Any
+        """
+        try:
+            param = dict.__getitem__(self, key)  # check if the param is in dict already
+            # if the new value is of Param type, then just use it otherwise call resolve on it
+            if isinstance(value, Param):
+                param = value
+            else:
+                param.resolve(value=value, suppress_exception=self.suppress_exception)
+        except KeyError:
+            # if the key isn't there already and if the value is of Param type,
+            # then use it otherwise create a new Param object
+            param = value if isinstance(value, Param) else Param(value)
+
+        dict.__setitem__(self, key, param)
+
+    def __getitem__(self, key: str) -> Any:
+        """
+        Override for dictionary's ``getitem`` method. After fetching the key, it would call the
+        resolve method as well on the Param object.
+
+        :param key: The key to fetch
+        :type key: str
+        """
+        param = dict.__getitem__(self, key)
+        return param.resolve(suppress_exception=self.suppress_exception)
+
+    def dump(self) -> dict:
+        """Dumps the ParamsDict object as a dictionary, while suppressing exceptions"""
+        return {k: v.resolve(suppress_exception=True) for k, v in self.items()}
+
+    def update(self, other_dict: dict) -> None:
+        """
+        Override for dictionary's update method.
+        :param other_dict: A dict type object which needs to be merged in the ParamsDict object
+        :type other_dict: dict
+        """
+        try:
+            for k, v in other_dict.items():
+                self.__setitem__(k, v)
+        except ValueError as ve:
+            raise ValueError(f'Invalid input for param {k}: {ve}') from None
+
+    def validate(self) -> None:

Review comment:
       Return type does not match L167 as this will return `resolved_dict` which might not be `None`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r701000560



##########
File path: airflow/providers/google/marketing_platform/operators/display_video.py
##########
@@ -362,15 +362,13 @@ class GoogleDisplayVideo360RunReportOperator(BaseOperator):
 
     template_fields = (
         "report_id",
-        "params",
         "impersonation_chain",
     )
 
     def __init__(
         self,
         *,
         report_id: str,
-        params: Dict[str, Any],

Review comment:
       Same here -- it's a name clash  but has a different meaning/use to the base params.
   
   Maybe both of these should be stored as `self._params`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: WIP: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r682515041



##########
File path: airflow/cli/commands/dag_command.py
##########
@@ -104,21 +104,24 @@ def dag_backfill(args, dag=None):
                 dag_run_state=State.NONE,
             )
 
-        dag.run(
-            start_date=args.start_date,
-            end_date=args.end_date,
-            mark_success=args.mark_success,
-            local=args.local,
-            donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
-            ignore_first_depends_on_past=args.ignore_first_depends_on_past,
-            ignore_task_deps=args.ignore_dependencies,
-            pool=args.pool,
-            delay_on_limit_secs=args.delay_on_limit,
-            verbose=args.verbose,
-            conf=run_conf,
-            rerun_failed_tasks=args.rerun_failed_tasks,
-            run_backwards=args.run_backwards,
-        )
+        try:
+            dag.run(
+                start_date=args.start_date,
+                end_date=args.end_date,
+                mark_success=args.mark_success,
+                local=args.local,
+                donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
+                ignore_first_depends_on_past=args.ignore_first_depends_on_past,
+                ignore_task_deps=args.ignore_dependencies,
+                pool=args.pool,
+                delay_on_limit_secs=args.delay_on_limit,
+                verbose=args.verbose,
+                conf=run_conf,
+                rerun_failed_tasks=args.rerun_failed_tasks,
+                run_backwards=args.run_backwards,
+            )
+        except ValueError as vr:
+            logging.error(str(vr))

Review comment:
       Ah yeah.
   
   We should also exit with a non-0 exit code in this case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: WIP: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r680464184



##########
File path: airflow/models/dag.py
##########
@@ -1976,7 +1981,7 @@ def create_dagrun(
         run_id: Optional[str] = None,
         start_date: Optional[datetime] = None,
         external_trigger: Optional[bool] = False,
-        conf: Optional[dict] = None,
+        conf: Optional[dict] = {},

Review comment:
       @kaxil I think the way we are using the `conf` variable, it would be safe from this gotcha, for example see this
   ```
   def safe_func(args={}):
       print(args)
   
   def prob_func(arg1={}, arg2={}):
       arg2.update(arg1)
       print(arg2)
   
   if __name__ == '__main__':
       safe_func(args={'a': 1})
       safe_func()
       safe_func(args={'a': 2})
   
       prob_func(arg1={'a': 1})
       prob_func()
       prob_func(arg1={'a': 2})
   ```
   generate outputs:
   ```
   {'a': 1}
   {}
   {'a': 2}
   {'a': 1}
   {'a': 1}
   {'a': 2}
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r706638994



##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+        default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                jsonschema.validate(self.default, self.schema, format_checker=FormatChecker())
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def resolve(self, value: Optional[Any] = None, suppress_exception: bool = False) -> Any:
+        """
+        Runs the validations and returns the Param's final value.
+        May raise ValueError on failed validations.
+
+        :param value: The value to be updated for the Param
+        :type: Optional[Any]
+        :param suppress_exception: To raise an exception or not when the validations fails.
+            If true and validations fails, the return value would be None.
+        :type suppress_exception: bool
+        """
+        try:
+            final_val = value or self.default
+            jsonschema.validate(final_val, self.schema, format_checker=FormatChecker())
+            self.default = final_val
+        except ValidationError as err:
+            if suppress_exception:
+                return None
+            raise ValueError(err) from None

Review comment:
       If we are refactoring this code maybe it would be good to raise `ValidationError` instead of `ValueError`. I think it would be more appropriate. Also, do we need this `from None`? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r706138480



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -878,6 +920,34 @@ def from_dict(cls, serialized_obj: dict) -> 'SerializedDAG':
             raise ValueError(f"Unsure how to deserialize version {ver!r}")
         return cls.deserialize_dag(serialized_obj['dag'])
 
+    @classmethod
+    def _serialize_dag_params(cls, dag_params: ParamsDict):
+        """Serialize Params dict for a DAG"""
+        serialized_params = {}
+        for k, v in dag_params.items():
+            # To support backward compatability with some operators, we would let str params
+            # serialized as well, after converting them into a shallow Param object.
+            if v.__class__.__name__ == 'str':

Review comment:
       oof yes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r701135611



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       `validate_schedule_and_params` is on the DAG, not the operator/task
   
   We can't change this use like this anyway -- it toally changes the behaviour of the operator




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r701024857



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       Yeah, these 2 operators should have named it differently, but wasn't. So if we revert this change the `params` won't get converted as `ParamsDict` and break things. However, if we keep this change, things still work fine as `BaseOperator.params` take place of `self.params`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r702402002



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       @msumit my point is that by the time the operator gets executed it should just receive the actual fully resolved params - it shouldn't have to care about type validation or anything like that as it should already be handled before execution.
   
   Otherwise every single operator that wants to access params at execute time will need to know about ParamsDict and to call resolve. And this feels like an anti pattern too me




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] paantya edited a comment on pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
paantya edited a comment on pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#issuecomment-908325158


   see also 
   https://github.com/apache/airflow/issues/17912


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r702109422



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       BTW. The compatibility checker for providers: https://github.com/apache/airflow/pull/18020




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] paantya commented on pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
paantya commented on pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#issuecomment-908325158


   see to 
   https://github.com/apache/airflow/issues/17912


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r701001225



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -346,6 +348,9 @@ def _deserialize(cls, encoded_var: Any) -> Any:
             return {cls._deserialize(v) for v in var}
         elif type_ == DAT.TUPLE:
             return tuple(cls._deserialize(v) for v in var)
+        elif type_ == DAT.PARAM:
+            param_class = import_string(var['_type'])
+            return param_class(**var)

Review comment:
       Isn't this doing to do the effect of `Param(_type="param", ...)` -- i.e. pass invalid kwargs to Param constructor?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r705677631



##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+        default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                jsonschema.validate(self.default, self.schema, format_checker=FormatChecker())
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def resolve(self, value: Optional[Any] = None, suppress_exception: bool = False) -> Any:
+        """
+        Runs the validations and returns the Param's final value.
+        May raise ValueError on failed validations.
+
+        :param value: The value to be updated for the Param
+        :type: Optional[Any]
+        :param suppress_exception: To raise an exception or not when the validations fails.
+            If true and validations fails, the return value would be None.
+        :type suppress_exception: bool
+        """
+        try:
+            final_val = value or self.default
+            jsonschema.validate(final_val, self.schema, format_checker=FormatChecker())
+            self.default = final_val
+        except ValidationError as err:
+            if suppress_exception:
+                return None
+            raise ValueError(err) from None
+        return final_val
+
+    def dump(self) -> dict:
+        """Dump the Param as a dictionary"""
+        out_dict = {'__class': f'{self.__module__}.{self.__class__.__name__}'}
+        out_dict.update(self.__dict__)
+        return out_dict
+
+
+class ParamsDict(dict):
+    """
+    Class to hold all params for dags or tasks. All the keys are strictly string and values
+    are converted into Param's object if they are not already. This class is to replace param's
+    dictionary implicitly and ideally not needed to be used directly.
+    """
+
+    def __init__(self, dict_obj: Optional[Dict] = None, suppress_exception: bool = False):
+        """
+        Init override for ParamsDict
+        :param dict_obj: A dict or dict like object to init ParamsDict
+        :type dict_obj: Optional[dict]
+        :param suppress_exception: Flag to suppress value exceptions while initializing the ParamsDict
+        :type suppress_exception: bool
+        """
+        params_dict = {}
+        dict_obj = dict_obj or {}
+        for k, v in dict_obj.items():
+            if not isinstance(v, Param):
+                params_dict[k] = Param(v)
+            else:
+                params_dict[k] = v
+        dict.__init__(self, params_dict)

Review comment:
       ```suggestion
           super().__init__(params_dict)
   ```

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+        default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                jsonschema.validate(self.default, self.schema, format_checker=FormatChecker())
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def resolve(self, value: Optional[Any] = None, suppress_exception: bool = False) -> Any:
+        """
+        Runs the validations and returns the Param's final value.
+        May raise ValueError on failed validations.
+
+        :param value: The value to be updated for the Param
+        :type: Optional[Any]
+        :param suppress_exception: To raise an exception or not when the validations fails.
+            If true and validations fails, the return value would be None.
+        :type suppress_exception: bool
+        """
+        try:
+            final_val = value or self.default
+            jsonschema.validate(final_val, self.schema, format_checker=FormatChecker())
+            self.default = final_val
+        except ValidationError as err:
+            if suppress_exception:
+                return None
+            raise ValueError(err) from None
+        return final_val
+
+    def dump(self) -> dict:
+        """Dump the Param as a dictionary"""
+        out_dict = {'__class': f'{self.__module__}.{self.__class__.__name__}'}
+        out_dict.update(self.__dict__)
+        return out_dict
+
+
+class ParamsDict(dict):
+    """
+    Class to hold all params for dags or tasks. All the keys are strictly string and values
+    are converted into Param's object if they are not already. This class is to replace param's
+    dictionary implicitly and ideally not needed to be used directly.
+    """
+
+    def __init__(self, dict_obj: Optional[Dict] = None, suppress_exception: bool = False):
+        """
+        Init override for ParamsDict
+        :param dict_obj: A dict or dict like object to init ParamsDict
+        :type dict_obj: Optional[dict]
+        :param suppress_exception: Flag to suppress value exceptions while initializing the ParamsDict
+        :type suppress_exception: bool
+        """
+        params_dict = {}
+        dict_obj = dict_obj or {}
+        for k, v in dict_obj.items():
+            if not isinstance(v, Param):
+                params_dict[k] = Param(v)
+            else:
+                params_dict[k] = v
+        dict.__init__(self, params_dict)
+        self.suppress_exception = suppress_exception
+
+    def __setitem__(self, key: str, value: Any) -> None:
+        """
+        Override for dictionary's ``setitem`` method. This method make sure that all values are of
+        Param's type only.
+
+        :param key: A key which needs to be inserted or updated in the dict
+        :type key: str
+        :param value: A value which needs to be set against the key. It could be of any
+            type but will be converted and stored as a Param object eventually.
+        :type value: Any
+        """
+        try:
+            param = dict.__getitem__(self, key)  # check if the param is in dict already
+            # if the new value is of Param type, then just use it otherwise call resolve on it
+            if isinstance(value, Param):
+                param = value
+            else:
+                param.resolve(value=value, suppress_exception=self.suppress_exception)
+        except KeyError:
+            # if the key isn't there already and if the value is of Param type,
+            # then use it otherwise create a new Param object
+            param = value if isinstance(value, Param) else Param(value)
+
+        dict.__setitem__(self, key, param)

Review comment:
       ```suggestion
           super().__setitem__(key, param)
   ```

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+        default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                jsonschema.validate(self.default, self.schema, format_checker=FormatChecker())
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def resolve(self, value: Optional[Any] = None, suppress_exception: bool = False) -> Any:
+        """
+        Runs the validations and returns the Param's final value.
+        May raise ValueError on failed validations.
+
+        :param value: The value to be updated for the Param
+        :type: Optional[Any]
+        :param suppress_exception: To raise an exception or not when the validations fails.
+            If true and validations fails, the return value would be None.
+        :type suppress_exception: bool
+        """
+        try:
+            final_val = value or self.default
+            jsonschema.validate(final_val, self.schema, format_checker=FormatChecker())
+            self.default = final_val
+        except ValidationError as err:
+            if suppress_exception:
+                return None
+            raise ValueError(err) from None
+        return final_val
+
+    def dump(self) -> dict:
+        """Dump the Param as a dictionary"""
+        out_dict = {'__class': f'{self.__module__}.{self.__class__.__name__}'}
+        out_dict.update(self.__dict__)
+        return out_dict
+
+
+class ParamsDict(dict):
+    """
+    Class to hold all params for dags or tasks. All the keys are strictly string and values
+    are converted into Param's object if they are not already. This class is to replace param's
+    dictionary implicitly and ideally not needed to be used directly.
+    """
+
+    def __init__(self, dict_obj: Optional[Dict] = None, suppress_exception: bool = False):
+        """
+        Init override for ParamsDict
+        :param dict_obj: A dict or dict like object to init ParamsDict
+        :type dict_obj: Optional[dict]
+        :param suppress_exception: Flag to suppress value exceptions while initializing the ParamsDict
+        :type suppress_exception: bool
+        """
+        params_dict = {}
+        dict_obj = dict_obj or {}
+        for k, v in dict_obj.items():
+            if not isinstance(v, Param):
+                params_dict[k] = Param(v)
+            else:
+                params_dict[k] = v
+        dict.__init__(self, params_dict)
+        self.suppress_exception = suppress_exception
+
+    def __setitem__(self, key: str, value: Any) -> None:
+        """
+        Override for dictionary's ``setitem`` method. This method make sure that all values are of
+        Param's type only.
+
+        :param key: A key which needs to be inserted or updated in the dict
+        :type key: str
+        :param value: A value which needs to be set against the key. It could be of any
+            type but will be converted and stored as a Param object eventually.
+        :type value: Any
+        """
+        try:
+            param = dict.__getitem__(self, key)  # check if the param is in dict already
+            # if the new value is of Param type, then just use it otherwise call resolve on it
+            if isinstance(value, Param):
+                param = value
+            else:
+                param.resolve(value=value, suppress_exception=self.suppress_exception)
+        except KeyError:
+            # if the key isn't there already and if the value is of Param type,
+            # then use it otherwise create a new Param object
+            param = value if isinstance(value, Param) else Param(value)

Review comment:
       Slightly simpler version (that doesn't needlessly get the param if we don't need to check it etc.)
   
   ```suggestion
           if isinstance(value, Param):
               param = value
           elif key in self:
               param = dict.__getitem__(self, key)
               param.resolve(value=value, suppress_exception=self.suppress_exception)
           else:
               # if the key isn't there already and if the value isn't of Param type create a new Param object
               param = Param(value)
   ```

##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+        default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                jsonschema.validate(self.default, self.schema, format_checker=FormatChecker())
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def resolve(self, value: Optional[Any] = None, suppress_exception: bool = False) -> Any:
+        """
+        Runs the validations and returns the Param's final value.
+        May raise ValueError on failed validations.
+
+        :param value: The value to be updated for the Param
+        :type: Optional[Any]
+        :param suppress_exception: To raise an exception or not when the validations fails.
+            If true and validations fails, the return value would be None.
+        :type suppress_exception: bool
+        """
+        try:
+            final_val = value or self.default
+            jsonschema.validate(final_val, self.schema, format_checker=FormatChecker())
+            self.default = final_val
+        except ValidationError as err:
+            if suppress_exception:
+                return None
+            raise ValueError(err) from None
+        return final_val
+
+    def dump(self) -> dict:
+        """Dump the Param as a dictionary"""
+        out_dict = {'__class': f'{self.__module__}.{self.__class__.__name__}'}
+        out_dict.update(self.__dict__)
+        return out_dict
+
+
+class ParamsDict(dict):
+    """
+    Class to hold all params for dags or tasks. All the keys are strictly string and values
+    are converted into Param's object if they are not already. This class is to replace param's
+    dictionary implicitly and ideally not needed to be used directly.
+    """
+
+    def __init__(self, dict_obj: Optional[Dict] = None, suppress_exception: bool = False):
+        """
+        Init override for ParamsDict
+        :param dict_obj: A dict or dict like object to init ParamsDict
+        :type dict_obj: Optional[dict]
+        :param suppress_exception: Flag to suppress value exceptions while initializing the ParamsDict
+        :type suppress_exception: bool
+        """
+        params_dict = {}
+        dict_obj = dict_obj or {}
+        for k, v in dict_obj.items():
+            if not isinstance(v, Param):
+                params_dict[k] = Param(v)
+            else:
+                params_dict[k] = v
+        dict.__init__(self, params_dict)
+        self.suppress_exception = suppress_exception
+
+    def __setitem__(self, key: str, value: Any) -> None:
+        """
+        Override for dictionary's ``setitem`` method. This method make sure that all values are of
+        Param's type only.
+
+        :param key: A key which needs to be inserted or updated in the dict
+        :type key: str
+        :param value: A value which needs to be set against the key. It could be of any
+            type but will be converted and stored as a Param object eventually.
+        :type value: Any
+        """
+        try:
+            param = dict.__getitem__(self, key)  # check if the param is in dict already
+            # if the new value is of Param type, then just use it otherwise call resolve on it
+            if isinstance(value, Param):
+                param = value
+            else:
+                param.resolve(value=value, suppress_exception=self.suppress_exception)
+        except KeyError:
+            # if the key isn't there already and if the value is of Param type,
+            # then use it otherwise create a new Param object
+            param = value if isinstance(value, Param) else Param(value)
+
+        dict.__setitem__(self, key, param)
+
+    def __getitem__(self, key: str) -> Any:
+        """
+        Override for dictionary's ``getitem`` method. After fetching the key, it would call the
+        resolve method as well on the Param object.
+
+        :param key: The key to fetch
+        :type key: str
+        """
+        param = dict.__getitem__(self, key)

Review comment:
       ```suggestion
           param = super().__getitem__(key)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r706136935



##########
File path: airflow/cli/commands/dag_command.py
##########
@@ -106,21 +106,25 @@ def dag_backfill(args, dag=None):
                 dag_run_state=State.NONE,
             )
 
-        dag.run(
-            start_date=args.start_date,
-            end_date=args.end_date,
-            mark_success=args.mark_success,
-            local=args.local,
-            donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
-            ignore_first_depends_on_past=args.ignore_first_depends_on_past,
-            ignore_task_deps=args.ignore_dependencies,
-            pool=args.pool,
-            delay_on_limit_secs=args.delay_on_limit,
-            verbose=args.verbose,
-            conf=run_conf,
-            rerun_failed_tasks=args.rerun_failed_tasks,
-            run_backwards=args.run_backwards,
-        )
+        try:
+            dag.run(
+                start_date=args.start_date,
+                end_date=args.end_date,
+                mark_success=args.mark_success,
+                local=args.local,
+                donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
+                ignore_first_depends_on_past=args.ignore_first_depends_on_past,
+                ignore_task_deps=args.ignore_dependencies,
+                pool=args.pool,
+                delay_on_limit_secs=args.delay_on_limit,
+                verbose=args.verbose,
+                conf=run_conf,
+                rerun_failed_tasks=args.rerun_failed_tasks,
+                run_backwards=args.run_backwards,
+            )
+        except ValueError as vr:
+            logging.error(str(vr))
+            sys.exit(1)

Review comment:
       Oh yes, good spot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r701000150



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       Yeah, so _this_ `params` is a name clash, and shouldn't be using the _same_ params.
   
   This needs to not be the same as `dag.params`/typed/ etc.
   
   I.e. this change needs to be  reverted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: WIP: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r681628602



##########
File path: airflow/models/dag.py
##########
@@ -2312,6 +2327,19 @@ def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: Ed
         """
         self.edge_info.setdefault(upstream_task_id, {})[downstream_task_id] = info
 
+    def validate_schedule_and_params(self):

Review comment:
       @uranusjr What is the best way of checking this given the introduction of timetables?
   
   Should this be this?
   
   ```suggestion
       def validate_schedule_and_params(self):
           if isinstance(self.timetable, NullTimetable):
               return
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17100: WIP: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r685850326



##########
File path: airflow/models/dag.py
##########
@@ -2312,6 +2327,19 @@ def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: Ed
         """
         self.edge_info.setdefault(upstream_task_id, {})[downstream_task_id] = info
 
+    def validate_schedule_and_params(self):

Review comment:
       It turns out a lot of code needs this pattern, so I added an interface for it in #17414.
   
   https://github.com/uranusjr/airflow/blob/3e6597b645e62bed299a7c3a7484ddda2f0a3fbc/airflow/timetables/base.py#L86-L98
   
   `dag.timetable` will get two attributes:
   
   * `can_run` is almost always True, only False for `schedule_interval=None`
   * `periodic` is almost always True, only False for `schedule_interval=None` and ``schedule_interval="@once"`
   
   So this would become `if not self.timetable.can_run`.
   
   The advantage of this over `isinstance` is custom timetables can override this if it makes sense (I can’t think of a scenario when it does, but you never know).
   
   Feel free to suggest better names in #17414.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit merged pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit merged pull request #17100:
URL: https://github.com/apache/airflow/pull/17100


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r707050988



##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+        default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                jsonschema.validate(self.default, self.schema, format_checker=FormatChecker())
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def resolve(self, value: Optional[Any] = None, suppress_exception: bool = False) -> Any:
+        """
+        Runs the validations and returns the Param's final value.
+        May raise ValueError on failed validations.
+
+        :param value: The value to be updated for the Param
+        :type: Optional[Any]
+        :param suppress_exception: To raise an exception or not when the validations fails.
+            If true and validations fails, the return value would be None.
+        :type suppress_exception: bool
+        """
+        try:
+            final_val = value or self.default
+            jsonschema.validate(final_val, self.schema, format_checker=FormatChecker())
+            self.default = final_val
+        except ValidationError as err:
+            if suppress_exception:
+                return None
+            raise ValueError(err) from None

Review comment:
       Oh nice, I didn't know about that! 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r699237132



##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any
+
+from jsonschema import Draft4Validator, FormatChecker
+from jsonschema.exceptions import ValidationError
+
+
+class Param:

Review comment:
       As discussed, moved DagParam to Param as well and added comments. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r701800858



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       We can easily add "additional_requirements' airflow `2.2.0+` for those two providers, to not complicate things. That will handle 2.2 with "hard" limit and constraints will take care about the pre-2.2 default installation.
   
   UPDATE:  I thought about it and we can also add a functionality in Providers Manager to check versions of providers from the community and warn if "unsupported" versions are installed. This will not make "hard" pip limit, but it will give clear indication that providers should be upgraded in case someone did not use constraints.
   
   This should be rather easy and will allow us to be more 'flexible" in case of future 'breaking changes" that actually expect airflow X.Y to use a provider Z.Q and above. This should be all quite easy to do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r702412362



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       @ashb I think this issue arises in these 2 operators cause they were (mis)using a variable. The intended use of params is to be used with a templated string and that is not broken here. I don't think that there is any other operator which would or should be using params in this way. It's like tomorrow some operator uses dag's `start_date` in some weird way and Airflow has to honor that in the future? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r699237132



##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any
+
+from jsonschema import Draft4Validator, FormatChecker
+from jsonschema.exceptions import ValidationError
+
+
+class Param:

Review comment:
       As discussed, moved DagParam to Param as well and added comments. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r701800858



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       We can easily add "additional_requirements' airflow `2.2.0+` for those two providers, to not complicate things. That will handle 2.2 with "hard" limit and constraints will take care about the pre-2.2 default installation.
   
   UPDATE:  I thought about it and we can also add a functionality in Providers Manager to check versions of providers from the community and warn if "unsupported" versions are installed. This will not make "hard" pip limit, but it will give clear indication that providers should be upgraded in case someone did not use constraints.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r702780224



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       Yes, they were "misusing" the params, but what about example?
   
   ```python
   @task
   def my_op(params):
      assert isinstance(params['my_param'], str)
   ```
   
   That should still be true (assuming `'my_param': Param(type='string', default='a')`.
   
   My point is that I cant' think of any reason for an Operator's execute function to ever get anything than the resolved values.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: WIP: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r691753057



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -789,6 +825,27 @@ def from_dict(cls, serialized_obj: dict) -> 'SerializedDAG':
             raise ValueError(f"Unsure how to deserialize version {ver!r}")
         return cls.deserialize_dag(serialized_obj['dag'])
 
+    @classmethod
+    def _serialize_dag_params(cls, dag_params: Dict[str, Param]):
+        """ """
+        serialized_params = {}
+        for k, v in dag_params.items():
+            serialized_params[k] = v.__dict__
+            serialized_params[k].update({'__type': f'{v.__class__.__module__}.{v.__class__.__name__}'})
+
+        return serialized_params
+
+    @classmethod
+    def _deserialize_dag_params(cls, encoded_dag_params: Dict) -> Dict[str, Param]:
+        """ """
+        op_params = {}
+        for k, v in encoded_dag_params.items():
+            param_class = import_string(v['__type'])

Review comment:
       We'll go ahead with option 1 for now and keep a TODO item for the 2nd one. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r706638528



##########
File path: airflow/example_dags/example_complex_params.py
##########
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the complex params."""
+
+from airflow import DAG
+from airflow.models.param import Param
+from airflow.operators.bash import BashOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+    "example_complex_params",
+    params={
+        'int_param': Param(10, type="integer", minimum=0, maximum=20),  # non default int param
+        'str_param': Param(type="string", minLength=2, maxLength=4),  # a mandatory str param
+        'old_param': 'old_way_of_passing',
+        'simple_param': Param('im_just_like_old_param'),  # i.e. no type checking
+        'email_param': Param(
+            'example@example.com', type='string', format='idn-email', minLength=5, maxLength=255
+        ),
+    },

Review comment:
       Should add some documentation around `Param`? It's user facing feature so users should be able to understan how it works and how to use it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r707016819



##########
File path: airflow/models/param.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Optional
+
+import jsonschema
+from jsonschema import FormatChecker
+from jsonschema.exceptions import ValidationError
+
+from airflow.exceptions import AirflowException
+
+
+class Param:
+    """
+    Class to hold the default value of a Param and rule set to do the validations. Without the rule set
+    it always validates and returns the default value.
+
+    :param default: The value of this Param object holds
+    :type default: Any
+    :param description: Optional help text for the Param
+    :type description: str
+    :param schema: The validation schema of the Param, if not given then all kwargs except
+        default & description will form the schema
+    :type schema: dict
+    """
+
+    def __init__(self, default: Any = None, description: str = None, **kwargs):
+        self.default = default
+        self.description = description
+        self.schema = kwargs.pop('schema') if 'schema' in kwargs else kwargs
+
+        # If default is not None, then validate it once, may raise ValueError
+        if default:
+            try:
+                jsonschema.validate(self.default, self.schema, format_checker=FormatChecker())
+            except ValidationError as err:
+                raise ValueError(err)
+
+    def resolve(self, value: Optional[Any] = None, suppress_exception: bool = False) -> Any:
+        """
+        Runs the validations and returns the Param's final value.
+        May raise ValueError on failed validations.
+
+        :param value: The value to be updated for the Param
+        :type: Optional[Any]
+        :param suppress_exception: To raise an exception or not when the validations fails.
+            If true and validations fails, the return value would be None.
+        :type suppress_exception: bool
+        """
+        try:
+            final_val = value or self.default
+            jsonschema.validate(final_val, self.schema, format_checker=FormatChecker())
+            self.default = final_val
+        except ValidationError as err:
+            if suppress_exception:
+                return None
+            raise ValueError(err) from None

Review comment:
       To suppress the exception context chain, see https://www.python.org/dev/peps/pep-0409




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: WIP: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r682325963



##########
File path: airflow/cli/commands/dag_command.py
##########
@@ -104,21 +104,24 @@ def dag_backfill(args, dag=None):
                 dag_run_state=State.NONE,
             )
 
-        dag.run(
-            start_date=args.start_date,
-            end_date=args.end_date,
-            mark_success=args.mark_success,
-            local=args.local,
-            donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
-            ignore_first_depends_on_past=args.ignore_first_depends_on_past,
-            ignore_task_deps=args.ignore_dependencies,
-            pool=args.pool,
-            delay_on_limit_secs=args.delay_on_limit,
-            verbose=args.verbose,
-            conf=run_conf,
-            rerun_failed_tasks=args.rerun_failed_tasks,
-            run_backwards=args.run_backwards,
-        )
+        try:
+            dag.run(
+                start_date=args.start_date,
+                end_date=args.end_date,
+                mark_success=args.mark_success,
+                local=args.local,
+                donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
+                ignore_first_depends_on_past=args.ignore_first_depends_on_past,
+                ignore_task_deps=args.ignore_dependencies,
+                pool=args.pool,
+                delay_on_limit_secs=args.delay_on_limit,
+                verbose=args.verbose,
+                conf=run_conf,
+                rerun_failed_tasks=args.rerun_failed_tasks,
+                run_backwards=args.run_backwards,
+            )
+        except ValueError as vr:
+            logging.error(str(vr))

Review comment:
       To display the error message on CLI




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] paantya commented on pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
paantya commented on pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#issuecomment-908325158


   see to 
   https://github.com/apache/airflow/issues/17912


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r701805091



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       We can do that, but I think in this particular case not having to deal with this at all in the Operator is a better solution, as it means we don't have to deal with this in every operator, nor have to tell users how to deal with it in their custom operators etc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r702400440



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       @ashb I had exactly the same code before introducing `ParamsDict`. The whole reason to introduce `ParamsDict` was to make the dag or task params auto resolved using the dunder methods, so now going back to the old way defeats the whole purpose. Also, in that method, we've to put an `if` condition to check if it's a string or param object and then call `resolve` on that, which is a bit ugly and keeps holes open for some other unwanted issues/misuses in the future. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r701792582



##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -100,15 +99,16 @@ def __init__(
         self.facebook_conn_id = facebook_conn_id
         self.api_version = api_version
         self.fields = fields
-        self.params = params
         self.gzip = gzip
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: dict):
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, api_version=self.api_version
         )
-        rows = service.bulk_facebook_report(params=self.params, fields=self.fields)
+        rows = service.bulk_facebook_report(
+            params={k: v.resolve() for k, v in self.params.items()}, fields=self.fields
+        )

Review comment:
       Oh right, gotcha.
   
   I think the main  thing here is that `dag.params` is so rarely used, that it wasn't the "intended" behvaiour here, so this is likely a bug that just no one has hit.
   
   My main concern with this change is that because the providers are released separately this would create a versioning nightmare where the old provider can't work with new Airflow and vice-versa.
   
   For example (all of these assuming a dag that does use `params):
   
   - If you have the current provider and install  airflow 2.2, then `task.params` would end up with Param objects in them, not the "actual" values
   - If you have Airflow 2.1 and install the next version of the provider that includes this change, you'd get an error trying to call `resolve()` on a `AttributeError: 'str' object has no attribute 'resolve'` (for example).
   
   So is it possible to handle this entirely in core Airflow, not in the operators, so that by the time `operator.execute()` gets it's hand on `self.params` it is just the values, not the Param etc?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#issuecomment-916423794


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r707015703



##########
File path: airflow/example_dags/example_complex_params.py
##########
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the complex params."""
+
+from airflow import DAG
+from airflow.models.param import Param
+from airflow.operators.bash import BashOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+    "example_complex_params",
+    params={
+        'int_param': Param(10, type="integer", minimum=0, maximum=20),  # non default int param
+        'str_param': Param(type="string", minLength=2, maxLength=4),  # a mandatory str param
+        'old_param': 'old_way_of_passing',
+        'simple_param': Param('im_just_like_old_param'),  # i.e. no type checking
+        'email_param': Param(
+            'example@example.com', type='string', format='idn-email', minLength=5, maxLength=255
+        ),
+    },

Review comment:
       Yup, working on it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r701028249



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -346,6 +348,9 @@ def _deserialize(cls, encoded_var: Any) -> Any:
             return {cls._deserialize(v) for v in var}
         elif type_ == DAT.TUPLE:
             return tuple(cls._deserialize(v) for v in var)
+        elif type_ == DAT.PARAM:
+            param_class = import_string(var['_type'])
+            return param_class(**var)

Review comment:
       yeah, it would be. though it's harmless but will fix this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17100: WIP: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r682515041



##########
File path: airflow/cli/commands/dag_command.py
##########
@@ -104,21 +104,24 @@ def dag_backfill(args, dag=None):
                 dag_run_state=State.NONE,
             )
 
-        dag.run(
-            start_date=args.start_date,
-            end_date=args.end_date,
-            mark_success=args.mark_success,
-            local=args.local,
-            donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
-            ignore_first_depends_on_past=args.ignore_first_depends_on_past,
-            ignore_task_deps=args.ignore_dependencies,
-            pool=args.pool,
-            delay_on_limit_secs=args.delay_on_limit,
-            verbose=args.verbose,
-            conf=run_conf,
-            rerun_failed_tasks=args.rerun_failed_tasks,
-            run_backwards=args.run_backwards,
-        )
+        try:
+            dag.run(
+                start_date=args.start_date,
+                end_date=args.end_date,
+                mark_success=args.mark_success,
+                local=args.local,
+                donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
+                ignore_first_depends_on_past=args.ignore_first_depends_on_past,
+                ignore_task_deps=args.ignore_dependencies,
+                pool=args.pool,
+                delay_on_limit_secs=args.delay_on_limit,
+                verbose=args.verbose,
+                conf=run_conf,
+                rerun_failed_tasks=args.rerun_failed_tasks,
+                run_backwards=args.run_backwards,
+            )
+        except ValueError as vr:
+            logging.error(str(vr))

Review comment:
       Ah yeah.
   
   We should also exit with a non-0 exit code in this case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: WIP: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r694062958



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -789,6 +825,27 @@ def from_dict(cls, serialized_obj: dict) -> 'SerializedDAG':
             raise ValueError(f"Unsure how to deserialize version {ver!r}")
         return cls.deserialize_dag(serialized_obj['dag'])
 
+    @classmethod
+    def _serialize_dag_params(cls, dag_params: Dict[str, Param]):
+        """ """
+        serialized_params = {}
+        for k, v in dag_params.items():
+            serialized_params[k] = v.__dict__
+            serialized_params[k].update({'__type': f'{v.__class__.__module__}.{v.__class__.__name__}'})
+
+        return serialized_params
+
+    @classmethod
+    def _deserialize_dag_params(cls, encoded_dag_params: Dict) -> Dict[str, Param]:
+        """ """
+        op_params = {}
+        for k, v in encoded_dag_params.items():
+            param_class = import_string(v['__type'])

Review comment:
       Done.. will add unit tests as well around this. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #17100: WIP: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r679573636



##########
File path: airflow/models/dag.py
##########
@@ -1976,7 +1981,7 @@ def create_dagrun(
         run_id: Optional[str] = None,
         start_date: Optional[datetime] = None,
         external_trigger: Optional[bool] = False,
-        conf: Optional[dict] = None,
+        conf: Optional[dict] = {},

Review comment:
       This will suffer from one of the common Python gotcha's about Mutable default args
   
   https://docs.python-guide.org/writing/gotchas/#mutable-default-arguments
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: WIP: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r684743458



##########
File path: airflow/models/taskinstance.py
##########
@@ -1522,7 +1523,7 @@ def is_eligible_to_retry(self):
         return self.task.retries and self.try_number <= self.max_tries
 
     @provide_session
-    def get_template_context(self, session=None) -> Context:
+    def get_template_context(self, ignore_param_exceptions=True, session=None) -> Context:

Review comment:
       `get_template_context` is being called from many places, but we want to raise exceptions from a couple of places only, hence keeping it to ignore by default. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] paantya edited a comment on pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
paantya edited a comment on pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#issuecomment-908325158


   see also 
   https://github.com/apache/airflow/issues/17912


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: WIP: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r691859343



##########
File path: airflow/models/dag.py
##########
@@ -2312,6 +2327,19 @@ def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: Ed
         """
         self.edge_info.setdefault(upstream_task_id, {})[downstream_task_id] = info
 
+    def validate_schedule_and_params(self):

Review comment:
       @uranusjr thanks. Any idea when you'll be merging #17414?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#issuecomment-910405928


   @ashb @kaxil @potiuk @uranusjr @turbaszek would you folks be able to review this PR. TIA. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] msumit commented on a change in pull request #17100: Advanced Params using json-schema

Posted by GitBox <gi...@apache.org>.
msumit commented on a change in pull request #17100:
URL: https://github.com/apache/airflow/pull/17100#discussion_r701030931



##########
File path: airflow/providers/google/marketing_platform/operators/display_video.py
##########
@@ -362,15 +362,13 @@ class GoogleDisplayVideo360RunReportOperator(BaseOperator):
 
     template_fields = (
         "report_id",
-        "params",
         "impersonation_chain",
     )
 
     def __init__(
         self,
         *,
         report_id: str,
-        params: Dict[str, Any],

Review comment:
       Not sure if I understood this. As this class defines and uses `params` there is no way to access base params so there is always a single accessible params only. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org