You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/07 15:12:14 UTC

[GitHub] [airflow] ashb commented on pull request #21328: Rewrite decorated task mapping

ashb commented on pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#issuecomment-1031573948


   The serialization needs some work for this approach. Here is an incomplete test:
   
   ```python
   def test_mapped_decorator_serde():
       from airflow.models.xcom_arg import XComArg
       from airflow.decorators import task
   
       with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
           task1 = BaseOperator(task_id="op1")
           xcomarg = XComArg(task1, "test_key")
   
           @task(retry_delay=30)
           def x(arg1, arg2):
               ...
   
       real_op = x.partial(arg1=1).map(arg2=xcomarg).operator
   
       serialized = SerializedBaseOperator._serialize(real_op)
   
       assert serialized == {
           '_is_dummy': False,
           '_is_mapped': True,
           '_task_module': '???',
           '_task_type': '???',
           'downstream_task_ids': [],
           'mapped_kwargs': {
               'arg1': [
                   1,
                   2,
                   {"__type": "dict", "__var": {'a': 'b'}},
               ],
               'arg2': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'test_key'}},
           },
           'task_id': 'x',
           # We don't want to include the python source code in the serialized representation
           # TODO? Where does `retry_delay` go? We need to separate 
       }
   
       op = SerializedBaseOperator.deserialize_operator(serialized)
       assert isinstance(op, MappedOperator)
       assert op.deps is MappedOperator.DEFAULT_DEPS
   
       # TODO: add some more asserts here
   ```
   
   Trying to call `real_op.unmap()` also throws a key error:
   
   
   ```
   tests/serialization/test_dag_serialization.py:1670: in test_mapped_decorator_serde
       real_op.unmap()
   airflow/models/baseoperator.py:1882: in unmap
       dag._remove_task(self.task_id)
   airflow/models/dag.py:2167: in _remove_task
       task = self.task_dict.pop(task_id)
   E   KeyError: 'x'
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org