You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/06/25 14:13:31 UTC

[GitHub] [airflow] consciencee opened a new issue, #24653: Mapped TriggerDagRunOperator causes SerializationError due to operator_extra_links 'property' object is not iterable

consciencee opened a new issue, #24653:
URL: https://github.com/apache/airflow/issues/24653

   ### Apache Airflow version
   
   2.3.2 (latest released)
   
   ### What happened
   
   Hi, I have a kind of issue with launching several subDags via mapping TriggerDagRunOperator (mapping over `conf` parameter). Here is the demo example of my typical DAG:
   
   ```python
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.operators.trigger_dagrun import TriggerDagRunOperator
   from airflow import XComArg
   from datetime import datetime
   
   with DAG(
       'triggerer',
       schedule_interval=None,
       catchup=False,
       start_date=datetime(2019, 12, 2)
   ) as dag:
   
       t1 = PythonOperator(
           task_id='first',
           python_callable=lambda : list(map(lambda i: {"x": i}, list(range(10)))),
       )
   
       t2 = TriggerDagRunOperator.partial(
           task_id='second',
           trigger_dag_id='mydag'
       ).expand(conf=XComArg(t1))
   
       t1 >> t2
   ```
   
   But when Airflow tries to import such DAG it throws the following SerializationError (which I observed both in UI and in $AIRFLOW_HOME/logs/scheduler/latest/<my_dag_name>.py.log):
   
   ```
   Broken DAG: [/home/aliona/airflow/dags/triggerer_dag.py] Traceback (most recent call last):
     File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 638, in _serialize_node
       serialize_op['_operator_extra_links'] = cls._serialize_operator_extra_links(
     File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 933, in _serialize_operator_extra_links
       for operator_extra_link in operator_extra_links:
   TypeError: 'property' object is not iterable
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 1106, in to_dict
       json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
     File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 1014, in serialize_dag
       raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
   airflow.exceptions.SerializationError: Failed to serialize DAG 'triggerer': 'property' object is not iterable
   ```
   
   How it appears in the UI:
   ![image](https://user-images.githubusercontent.com/23297330/175775674-f3375c0e-7ea7-4b6a-84e8-b02ee8f02062.png)
   
   
   ### What you think should happen instead
   
   I think that TriggerDagRunOperator mapped over `conf` parameter should serialize and work well by default. 
   
   During the debugging process and trying to make everything work I found out that simple non-mapped TriggerDagRunOperator has value `['Triggered DAG']` in `operator_extra_links` field, so, it is Lisr. But as for mapped TriggerDagRunOperator, it is 'property'. I don't have any idea why during the serialization process Airflow cannot get value of this property, but I tried to reinitialize this field with `['Triggered DAG']` value and finally I fixed this issue in a such way.
   
   For now, for every case of using mapped TriggerDagRunOperator I also use such code at the end of my dag file:
   ```python
   # here 'second' is the name of corresponding mapped TriggerDagRunOperator task (see demo code above)
   t2_patch = dag.task_dict['second']
   t2_patch.operator_extra_links=['Triggered DAG']
   dag.task_dict.update({'second': t2_patch})
   ```
   
   So, for every mapped TriggerDagRunOperator task I manually change value of operator_extra_links property to `['Triggered DAG']` and as a result there is no any SerializationError. I have a lot of such cases, and all of them are working good with this fix, all subDags are launched, mapped configuration is passed correctly. Also I can wait for end of their execution or not, all this options also work correctly. 
   
   ### How to reproduce
   
   1. Create dag with mapped TriggerDagRunOperator tasks (main parameters such as task_id, trigger_dag_id and others are in `partial section`, in `expand` section use conf parameter with non-empty iterable value), as, for example:
   ```python
   t2 = TriggerDagRunOperator.partial(
           task_id='second',
           trigger_dag_id='mydag'
       ).expand(conf=[{'x': 1}])
   ```
   
   2. Try to serialize dag, and error will appear.
   
   The full example of failing dag file:
   ```python
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.operators.trigger_dagrun import TriggerDagRunOperator
   from airflow import XComArg
   from datetime import datetime
   
   with DAG(
       'triggerer',
       schedule_interval=None,
       catchup=False,
       start_date=datetime(2019, 12, 2)
   ) as dag:
   
       t1 = PythonOperator(
           task_id='first',
           python_callable=lambda : list(map(lambda i: {"x": i}, list(range(10)))),
       )
   
       t2 = TriggerDagRunOperator.partial(
           task_id='second',
           trigger_dag_id='mydag'
       ).expand(conf=[{'a': 1}])
   
       t1 >> t2
   
   # uncomment these lines to fix an error
   # t2_patch = dag.task_dict['second']
   # t2_patch.operator_extra_links=['Triggered DAG']
   # dag.task_dict.update({'second': t2_patch})
   ```
   As subDag ('mydag') I use these DAG:
   
   ```python
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from datetime import datetime
   
   with DAG(
       'mydag',
       schedule_interval=None,
       catchup=False,
       start_date=datetime(2019, 12, 2)
   ) as dag:
       t1 = PythonOperator(
           task_id='first',
           python_callable=lambda : print("first"),
       )
       t2 = PythonOperator(
           task_id='second',
           python_callable=lambda : print("second"),
       )
       t1 >> t2
   ```
   
   ### Operating System
   
   Ubuntu 22.04 LTS
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-ftp==2.1.2
   apache-airflow-providers-http==2.1.2
   apache-airflow-providers-imap==2.2.3
   apache-airflow-providers-sqlite==2.1.3
   
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   Python 3.10.4
   pip 22.0.2
   
   ### Anything else
   
   Currently for demonstration purposes I am using fully local Airflow installation: single node, SequentialExecutor and SQLite database backend. But such issue also appeared for multi-node installation with either CeleryExecutor or LocalExecutor and PostgreSQL database in the backend.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr closed issue #24653: Mapped TriggerDagRunOperator causes SerializationError due to operator_extra_links 'property' object is not iterable

Posted by GitBox <gi...@apache.org>.
uranusjr closed issue #24653: Mapped TriggerDagRunOperator causes SerializationError due to operator_extra_links 'property' object is not iterable 
URL: https://github.com/apache/airflow/issues/24653


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #24653: Mapped TriggerDagRunOperator causes SerializationError due to operator_extra_links 'property' object is not iterable

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #24653:
URL: https://github.com/apache/airflow/issues/24653#issuecomment-1166296267

   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org