You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/13 15:03:30 UTC

[GitHub] [airflow] paolo-moriello opened a new pull request, #27035: Move TriggerDagRunOperator config validation from init to execute

paolo-moriello opened a new pull request, #27035:
URL: https://github.com/apache/airflow/pull/27035

   -Hi all, this is my first PR to Airflow, so bear with me in case I've missing something :)
   
   I'd like to update the `TriggerDagRunOperator` to add support for TaskFlow and allow reading the conf from the xcom directly, without having to encapsulate it in a wrapper function.
   At the moment, in fact, the TriggerDagRunOperator performs the `conf` param type-validaton in the init, and this fails if we try to pass an xcom object directly with Taskflow as the xcom is deserialized only after the validation. The only solution to use it at the moment is to encapsulate it in a function, and manually run it:
   ```python
   @dag
   def taskflow_dag(**kwargs):
       @task
       def foo():
           return {"foo":"bar"}
   
       @task
       def trigger_wrapper(myconfig):
           TriggerDagRunOperator(..., conf=myconfig).execute(kwargs)
   
       myconfig = foo()
       trigger_wrapper(myconfig)
   ```
   
   while I would like to be able to do:
   
   ```python
   @dag
   def taskflow_dag(**kwargs):
       @task
       def foo():
           return {"foo":"bar"}
   
       myconfig = foo()
       TriggerDagRunOperator(..., conf=myconfig) # this fails now because myconfig is deserialized after the init
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #27035: Move TriggerDagRunOperator config validation from init to execute

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #27035:
URL: https://github.com/apache/airflow/pull/27035#discussion_r995108431


##########
airflow/operators/trigger_dagrun.py:
##########
@@ -128,6 +123,11 @@ def execute(self, context: Context):
         else:
             parsed_execution_date = timezone.utcnow()
 
+        try:
+            json.dumps(self.conf)
+        except TypeError:
+            raise AirflowException("conf parameter should be JSON Serializable")
+

Review Comment:
   There was some discussion around allowing `XComArg`s in the initial issue, see [here](https://github.com/apache/airflow/issues/13414#issuecomment-776173348). One specific call out from @kaxil was:
   > Currently, we don't support it as we need to de-serialize it for Webserver and show it in the UI in List DagRun page. Needs a bigger change to support other objects that are not JSON serializable atm
   
   Can you do some testing around loading this page with your change @paolo-moriello to confirm that more code changes are required to support this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #27035: Move TriggerDagRunOperator config validation from init to execute

Posted by GitBox <gi...@apache.org>.
potiuk merged PR #27035:
URL: https://github.com/apache/airflow/pull/27035


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #27035: Move TriggerDagRunOperator config validation from init to execute

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #27035:
URL: https://github.com/apache/airflow/pull/27035#discussion_r996062541


##########
airflow/operators/trigger_dagrun.py:
##########
@@ -128,6 +123,11 @@ def execute(self, context: Context):
         else:
             parsed_execution_date = timezone.utcnow()
 
+        try:
+            json.dumps(self.conf)
+        except TypeError:
+            raise AirflowException("conf parameter should be JSON Serializable")
+

Review Comment:
   Thanks! 
   Then this seems okay from my end, I'll add @kaxil as a reviewer since he has the historical context for this snippet of code. Plus he has the ability to merge the change if he sees fit :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #27035: Move TriggerDagRunOperator config validation from init to execute

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on PR #27035:
URL: https://github.com/apache/airflow/pull/27035#issuecomment-1304847685

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] paolo-moriello commented on a diff in pull request #27035: Move TriggerDagRunOperator config validation from init to execute

Posted by GitBox <gi...@apache.org>.
paolo-moriello commented on code in PR #27035:
URL: https://github.com/apache/airflow/pull/27035#discussion_r995969704


##########
airflow/operators/trigger_dagrun.py:
##########
@@ -128,6 +123,11 @@ def execute(self, context: Context):
         else:
             parsed_execution_date = timezone.utcnow()
 
+        try:
+            json.dumps(self.conf)
+        except TypeError:
+            raise AirflowException("conf parameter should be JSON Serializable")
+

Review Comment:
   Yes, this is the code I've used:
   
   ```python
   @dag(
       dag_id="trigger_dag",
       schedule_interval="@hourly",
       default_args=args,
   )
   def trigger_pipeline(**kwargs):
       @task
       def foo() -> Dict[str, str]:
           return {"foo": "bar"}
   
       myconfig = foo() # (variable) myconfig: XComArg
       TriggerDagRunOperator(task_id='tdro', trigger_dag_id="listener_dag", conf=myconfig)
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] paolo-moriello commented on a diff in pull request #27035: Move TriggerDagRunOperator config validation from init to execute

Posted by GitBox <gi...@apache.org>.
paolo-moriello commented on code in PR #27035:
URL: https://github.com/apache/airflow/pull/27035#discussion_r995753819


##########
airflow/operators/trigger_dagrun.py:
##########
@@ -128,6 +123,11 @@ def execute(self, context: Context):
         else:
             parsed_execution_date = timezone.utcnow()
 
+        try:
+            json.dumps(self.conf)
+        except TypeError:
+            raise AirflowException("conf parameter should be JSON Serializable")
+

Review Comment:
   The change I'm proposing here is not changing the existing logic, apart from enabling the above code workflow which otherwise fails.
   
   I've quickly tested that in the List DagRun UI I can see the both the triggered and the trigged dags, with the related config:
   ![Screenshot 2022-10-14 at 15 19 37](https://user-images.githubusercontent.com/61800102/195857000-ced71f8d-a4f6-4d54-aa5e-e19d2ca062cb.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #27035: Move TriggerDagRunOperator config validation from init to execute

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #27035:
URL: https://github.com/apache/airflow/pull/27035#issuecomment-1296473454

   I re-run the failed job to see if it was accidental. In any case might be good ide @paolo-moriello  to rebase it. @kaxil ? comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #27035: Move TriggerDagRunOperator config validation from init to execute

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #27035:
URL: https://github.com/apache/airflow/pull/27035#issuecomment-1304847658

   I think we can merge it and in case further discussion is needed we can always change it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #27035: Move TriggerDagRunOperator config validation from init to execute

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #27035:
URL: https://github.com/apache/airflow/pull/27035#discussion_r995946332


##########
airflow/operators/trigger_dagrun.py:
##########
@@ -128,6 +123,11 @@ def execute(self, context: Context):
         else:
             parsed_execution_date = timezone.utcnow()
 
+        try:
+            json.dumps(self.conf)
+        except TypeError:
+            raise AirflowException("conf parameter should be JSON Serializable")
+

Review Comment:
   Thanks @paolo-moriello!
   > I've quickly tested that in the List DagRun UI I can see the both the triggered and the trigged dags, with the related config:
   
   And just to confirm, the config input your tested with here was an XComArg object?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #27035: Move TriggerDagRunOperator config validation from init to execute

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #27035:
URL: https://github.com/apache/airflow/pull/27035#issuecomment-1289078166

   Rebased to rebuild it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] paolo-moriello commented on pull request #27035: Move TriggerDagRunOperator config validation from init to execute

Posted by GitBox <gi...@apache.org>.
paolo-moriello commented on PR #27035:
URL: https://github.com/apache/airflow/pull/27035#issuecomment-1288497937

   @kaxil any update on this? would be great to have a feedback from you here :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #27035: Move TriggerDagRunOperator config validation from init to execute

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #27035:
URL: https://github.com/apache/airflow/pull/27035#issuecomment-1289079930

   Looks cool to me BTW.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #27035: Move TriggerDagRunOperator config validation from init to execute

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on PR #27035:
URL: https://github.com/apache/airflow/pull/27035#issuecomment-1277761278

   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] paolo-moriello commented on a diff in pull request #27035: Move TriggerDagRunOperator config validation from init to execute

Posted by GitBox <gi...@apache.org>.
paolo-moriello commented on code in PR #27035:
URL: https://github.com/apache/airflow/pull/27035#discussion_r995969704


##########
airflow/operators/trigger_dagrun.py:
##########
@@ -128,6 +123,11 @@ def execute(self, context: Context):
         else:
             parsed_execution_date = timezone.utcnow()
 
+        try:
+            json.dumps(self.conf)
+        except TypeError:
+            raise AirflowException("conf parameter should be JSON Serializable")
+

Review Comment:
   Yes, this is the code I've used:
   
   ```python
   @dag
   def trigger_pipeline(**kwargs):
       @task
       def foo() -> Dict[str, str]:
           return {"foo": "bar"}
   
       myconfig = foo() # (variable) myconfig: XComArg
       TriggerDagRunOperator(task_id='tdro', trigger_dag_id="listener_dag", conf=myconfig)
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org