You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/07 15:12:14 UTC
[GitHub] [airflow] ashb commented on pull request #21328: Rewrite decorated task mapping
ashb commented on pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#issuecomment-1031573948
The serialization needs some work for this approach. Here is an incomplete test:
```python
def test_mapped_decorator_serde():
from airflow.models.xcom_arg import XComArg
from airflow.decorators import task
with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
task1 = BaseOperator(task_id="op1")
xcomarg = XComArg(task1, "test_key")
@task(retry_delay=30)
def x(arg1, arg2):
...
real_op = x.partial(arg1=1).map(arg2=xcomarg).operator
serialized = SerializedBaseOperator._serialize(real_op)
assert serialized == {
'_is_dummy': False,
'_is_mapped': True,
'_task_module': '???',
'_task_type': '???',
'downstream_task_ids': [],
'mapped_kwargs': {
'arg1': [
1,
2,
{"__type": "dict", "__var": {'a': 'b'}},
],
'arg2': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'test_key'}},
},
'task_id': 'x',
# We don't want to include the python source code in the serialized representation
# TODO? Where does `retry_delay` go? We need to separate
}
op = SerializedBaseOperator.deserialize_operator(serialized)
assert isinstance(op, MappedOperator)
assert op.deps is MappedOperator.DEFAULT_DEPS
# TODO: add some more asserts here
```
Trying to call `real_op.unmap()` also throws a key error:
```
tests/serialization/test_dag_serialization.py:1670: in test_mapped_decorator_serde
real_op.unmap()
airflow/models/baseoperator.py:1882: in unmap
dag._remove_task(self.task_id)
airflow/models/dag.py:2167: in _remove_task
task = self.task_dict.pop(task_id)
E KeyError: 'x'
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org