You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/06/25 14:13:31 UTC
[GitHub] [airflow] consciencee opened a new issue, #24653: Mapped TriggerDagRunOperator causes SerializationError due to operator_extra_links 'property' object is not iterable
consciencee opened a new issue, #24653:
URL: https://github.com/apache/airflow/issues/24653
### Apache Airflow version
2.3.2 (latest released)
### What happened
Hi, I have a kind of issue with launching several subDags via mapping TriggerDagRunOperator (mapping over `conf` parameter). Here is the demo example of my typical DAG:
```python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow import XComArg
from datetime import datetime
with DAG(
'triggerer',
schedule_interval=None,
catchup=False,
start_date=datetime(2019, 12, 2)
) as dag:
t1 = PythonOperator(
task_id='first',
python_callable=lambda : list(map(lambda i: {"x": i}, list(range(10)))),
)
t2 = TriggerDagRunOperator.partial(
task_id='second',
trigger_dag_id='mydag'
).expand(conf=XComArg(t1))
t1 >> t2
```
But when Airflow tries to import such DAG it throws the following SerializationError (which I observed both in UI and in $AIRFLOW_HOME/logs/scheduler/latest/<my_dag_name>.py.log):
```
Broken DAG: [/home/aliona/airflow/dags/triggerer_dag.py] Traceback (most recent call last):
File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 638, in _serialize_node
serialize_op['_operator_extra_links'] = cls._serialize_operator_extra_links(
File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 933, in _serialize_operator_extra_links
for operator_extra_link in operator_extra_links:
TypeError: 'property' object is not iterable
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 1106, in to_dict
json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 1014, in serialize_dag
raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
airflow.exceptions.SerializationError: Failed to serialize DAG 'triggerer': 'property' object is not iterable
```
How it appears in the UI:
![image](https://user-images.githubusercontent.com/23297330/175775674-f3375c0e-7ea7-4b6a-84e8-b02ee8f02062.png)
### What you think should happen instead
I think that TriggerDagRunOperator mapped over `conf` parameter should serialize and work well by default.
During the debugging process and trying to make everything work I found out that simple non-mapped TriggerDagRunOperator has value `['Triggered DAG']` in `operator_extra_links` field, so, it is Lisr. But as for mapped TriggerDagRunOperator, it is 'property'. I don't have any idea why during the serialization process Airflow cannot get value of this property, but I tried to reinitialize this field with `['Triggered DAG']` value and finally I fixed this issue in a such way.
For now, for every case of using mapped TriggerDagRunOperator I also use such code at the end of my dag file:
```python
# here 'second' is the name of corresponding mapped TriggerDagRunOperator task (see demo code above)
t2_patch = dag.task_dict['second']
t2_patch.operator_extra_links=['Triggered DAG']
dag.task_dict.update({'second': t2_patch})
```
So, for every mapped TriggerDagRunOperator task I manually change value of operator_extra_links property to `['Triggered DAG']` and as a result there is no any SerializationError. I have a lot of such cases, and all of them are working good with this fix, all subDags are launched, mapped configuration is passed correctly. Also I can wait for end of their execution or not, all this options also work correctly.
### How to reproduce
1. Create dag with mapped TriggerDagRunOperator tasks (main parameters such as task_id, trigger_dag_id and others are in `partial section`, in `expand` section use conf parameter with non-empty iterable value), as, for example:
```python
t2 = TriggerDagRunOperator.partial(
task_id='second',
trigger_dag_id='mydag'
).expand(conf=[{'x': 1}])
```
2. Try to serialize dag, and error will appear.
The full example of failing dag file:
```python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow import XComArg
from datetime import datetime
with DAG(
'triggerer',
schedule_interval=None,
catchup=False,
start_date=datetime(2019, 12, 2)
) as dag:
t1 = PythonOperator(
task_id='first',
python_callable=lambda : list(map(lambda i: {"x": i}, list(range(10)))),
)
t2 = TriggerDagRunOperator.partial(
task_id='second',
trigger_dag_id='mydag'
).expand(conf=[{'a': 1}])
t1 >> t2
# uncomment these lines to fix an error
# t2_patch = dag.task_dict['second']
# t2_patch.operator_extra_links=['Triggered DAG']
# dag.task_dict.update({'second': t2_patch})
```
As subDag ('mydag') I use these DAG:
```python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
with DAG(
'mydag',
schedule_interval=None,
catchup=False,
start_date=datetime(2019, 12, 2)
) as dag:
t1 = PythonOperator(
task_id='first',
python_callable=lambda : print("first"),
)
t2 = PythonOperator(
task_id='second',
python_callable=lambda : print("second"),
)
t1 >> t2
```
### Operating System
Ubuntu 22.04 LTS
### Versions of Apache Airflow Providers
apache-airflow-providers-ftp==2.1.2
apache-airflow-providers-http==2.1.2
apache-airflow-providers-imap==2.2.3
apache-airflow-providers-sqlite==2.1.3
### Deployment
Virtualenv installation
### Deployment details
Python 3.10.4
pip 22.0.2
### Anything else
Currently for demonstration purposes I am using fully local Airflow installation: single node, SequentialExecutor and SQLite database backend. But such issue also appeared for multi-node installation with either CeleryExecutor or LocalExecutor and PostgreSQL database in the backend.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr closed issue #24653: Mapped TriggerDagRunOperator causes SerializationError due to operator_extra_links 'property' object is not iterable
Posted by GitBox <gi...@apache.org>.
uranusjr closed issue #24653: Mapped TriggerDagRunOperator causes SerializationError due to operator_extra_links 'property' object is not iterable
URL: https://github.com/apache/airflow/issues/24653
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on issue #24653: Mapped TriggerDagRunOperator causes SerializationError due to operator_extra_links 'property' object is not iterable
Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #24653:
URL: https://github.com/apache/airflow/issues/24653#issuecomment-1166296267
Thanks for opening your first issue here! Be sure to follow the issue template!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org