You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/04 14:29:29 UTC

[GitHub] [airflow] uranusjr opened a new pull request #21328: Rewrite decorated task mapping

uranusjr opened a new pull request #21328:
URL: https://github.com/apache/airflow/pull/21328


   Need to get #21210 merged first and rebase this.
   
   This rewrites how `map()` is implemented on `_TaskDecorator`. The previous implementation incorrectly assumed mapping a task decorator works like mapping a traditional operator, but in fact they have entirely different semantics.
   
   When mapping a traditional operator, `FooOperator.map(my_arg=[1, 2, 3])`, the argument on _`FooOperator`_ is mapped. But when mapping a task decorator, `my_task.map(my_val=[1, 2, 3])`, it’s the argument on _the function wrapped by the task object_ being mapped. Therefore, mapped (and also partial-ed) values for `my_val` should go into the `DecoratedOperator`’s `op_kwargs` (and `op_args`) arguments instead.
   
   An end-to-end test is provided to validate the simplest case of this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21328: Rewrite decorated task mapping

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#discussion_r802713213



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1654,6 +1654,59 @@ def test_mapped_operator_xcomarg_serde():
     assert xcom_arg.operator is serialized_dag.task_dict['op1']
 
 
+def test_mapped_decorator_serde():
+    from airflow.decorators import task
+    from airflow.models.xcom_arg import XComArg
+    from airflow.serialization.serialized_objects import _XComRef
+
+    with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
+        op1 = BaseOperator(task_id="op1")
+        xcomarg = XComArg(op1, "my_key")
+
+        @task(retry_delay=30)
+        def x(arg1, arg2, arg3, arg4):
+            print(arg1, arg2, arg3, arg4)
+
+        x.partial("foo", arg3=[1, 2, {"a": "b"}]).map({"a": 1, "b": 2}, arg4=xcomarg)
+
+    original = dag.get_task("x")
+
+    serialized = SerializedBaseOperator._serialize(original)
+    assert serialized == {
+        '_is_dummy': False,
+        '_is_mapped': True,
+        '_task_module': 'airflow.decorators.python',
+        '_task_type': '_PythonDecoratedOperator',
+        'downstream_task_ids': [],
+        'partial_kwargs': {
+            'op_args': ["foo"],
+            'op_kwargs': {'arg3': [1, 2, {"__type": "dict", "__var": {'a': 'b'}}]},
+            'retry_delay': 30,
+        },
+        'mapped_kwargs': {
+            'op_args': [{"__type": "dict", "__var": {'a': 1, 'b': 2}}],
+            'op_kwargs': {'arg4': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'my_key'}}},
+        },
+        'task_id': 'x',
+        'template_ext': [],
+        'template_fields': ['op_args', 'op_kwargs'],
+    }
+
+    deserialized = SerializedBaseOperator.deserialize_operator(serialized)
+    assert isinstance(deserialized, MappedOperator)
+    assert deserialized.deps is MappedOperator.DEFAULT_DEPS
+
+    assert deserialized.mapped_kwargs == {
+        "op_args": [{"a": 1, "b": 2}],
+        "op_kwargs": {"arg4": _XComRef("op1", "my_key")},
+    }
+    assert deserialized.partial_kwargs == {
+        "retry_delay": 30,
+        "op_args": ["foo"],
+        "op_kwargs": {"arg3": [1, 2, {"a": "b"}]},
+    }
+

Review comment:
       I added a test in [`tests/decorators/test_python.py`](https://github.com/apache/airflow/pull/21328/files#diff-0b9655774f25122e8e80e20287067ed33879947519cec000528b104f60ed4fa7) for this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #21328: Rewrite decorated task mapping

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#issuecomment-1034566556


   Static check failures fixed in #21480.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #21328: Rewrite decorated task mapping

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#issuecomment-1033535889


   We should also implement some validation when a DAG is parsed to make sure the user pass reasonable values to `partial_kwargs`. This will be done in a separate PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr merged pull request #21328: Rewrite decorated task mapping

Posted by GitBox <gi...@apache.org>.
uranusjr merged pull request #21328:
URL: https://github.com/apache/airflow/pull/21328


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21328: Rewrite decorated task mapping

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#discussion_r802643852



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1654,6 +1654,59 @@ def test_mapped_operator_xcomarg_serde():
     assert xcom_arg.operator is serialized_dag.task_dict['op1']
 
 
+def test_mapped_decorator_serde():
+    from airflow.decorators import task
+    from airflow.models.xcom_arg import XComArg
+    from airflow.serialization.serialized_objects import _XComRef
+
+    with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
+        op1 = BaseOperator(task_id="op1")
+        xcomarg = XComArg(op1, "my_key")
+
+        @task(retry_delay=30)
+        def x(arg1, arg2, arg3, arg4):
+            print(arg1, arg2, arg3, arg4)
+
+        x.partial("foo", arg3=[1, 2, {"a": "b"}]).map({"a": 1, "b": 2}, arg4=xcomarg)
+
+    original = dag.get_task("x")
+
+    serialized = SerializedBaseOperator._serialize(original)
+    assert serialized == {
+        '_is_dummy': False,
+        '_is_mapped': True,
+        '_task_module': 'airflow.decorators.python',
+        '_task_type': '_PythonDecoratedOperator',
+        'downstream_task_ids': [],
+        'partial_kwargs': {
+            'op_args': ["foo"],
+            'op_kwargs': {'arg3': [1, 2, {"__type": "dict", "__var": {'a': 'b'}}]},
+            'retry_delay': 30,
+        },
+        'mapped_kwargs': {
+            'op_args': [{"__type": "dict", "__var": {'a': 1, 'b': 2}}],
+            'op_kwargs': {'arg4': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'my_key'}}},
+        },
+        'task_id': 'x',
+        'template_ext': [],
+        'template_fields': ['op_args', 'op_kwargs'],
+    }
+
+    deserialized = SerializedBaseOperator.deserialize_operator(serialized)
+    assert isinstance(deserialized, MappedOperator)
+    assert deserialized.deps is MappedOperator.DEFAULT_DEPS
+
+    assert deserialized.mapped_kwargs == {
+        "op_args": [{"a": 1, "b": 2}],
+        "op_kwargs": {"arg4": _XComRef("op1", "my_key")},
+    }
+    assert deserialized.partial_kwargs == {
+        "retry_delay": 30,
+        "op_args": ["foo"],
+        "op_kwargs": {"arg3": [1, 2, {"a": "b"}]},
+    }
+

Review comment:
       At this point `retry_delay` is still `30` verbatim; it become a timedelta only after unmapped. I’ll add a test elsewhere for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21328: Rewrite decorated task mapping

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#discussion_r802928681



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1654,6 +1654,59 @@ def test_mapped_operator_xcomarg_serde():
     assert xcom_arg.operator is serialized_dag.task_dict['op1']
 
 
+def test_mapped_decorator_serde():
+    from airflow.decorators import task
+    from airflow.models.xcom_arg import XComArg
+    from airflow.serialization.serialized_objects import _XComRef
+
+    with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
+        op1 = BaseOperator(task_id="op1")
+        xcomarg = XComArg(op1, "my_key")
+
+        @task(retry_delay=30)
+        def x(arg1, arg2, arg3, arg4):
+            print(arg1, arg2, arg3, arg4)
+
+        x.partial("foo", arg3=[1, 2, {"a": "b"}]).map({"a": 1, "b": 2}, arg4=xcomarg)
+
+    original = dag.get_task("x")
+
+    serialized = SerializedBaseOperator._serialize(original)
+    assert serialized == {
+        '_is_dummy': False,
+        '_is_mapped': True,
+        '_task_module': 'airflow.decorators.python',
+        '_task_type': '_PythonDecoratedOperator',
+        'downstream_task_ids': [],
+        'partial_kwargs': {
+            'op_args': ["foo"],
+            'op_kwargs': {'arg3': [1, 2, {"__type": "dict", "__var": {'a': 'b'}}]},
+            'retry_delay': 30,
+        },
+        'mapped_kwargs': {
+            'op_args': [{"__type": "dict", "__var": {'a': 1, 'b': 2}}],
+            'op_kwargs': {'arg4': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'my_key'}}},
+        },
+        'task_id': 'x',
+        'template_ext': [],
+        'template_fields': ['op_args', 'op_kwargs'],
+    }
+
+    deserialized = SerializedBaseOperator.deserialize_operator(serialized)
+    assert isinstance(deserialized, MappedOperator)
+    assert deserialized.deps is MappedOperator.DEFAULT_DEPS
+
+    assert deserialized.mapped_kwargs == {
+        "op_args": [{"a": 1, "b": 2}],
+        "op_kwargs": {"arg4": _XComRef("op1", "my_key")},
+    }
+    assert deserialized.partial_kwargs == {
+        "retry_delay": 30,
+        "op_args": ["foo"],
+        "op_kwargs": {"arg3": [1, 2, {"a": "b"}]},
+    }
+

Review comment:
       Yeah agreed. We can fix this up later, this unblocks a lot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21328: Rewrite decorated task mapping

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#discussion_r802816045



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1654,6 +1654,59 @@ def test_mapped_operator_xcomarg_serde():
     assert xcom_arg.operator is serialized_dag.task_dict['op1']
 
 
+def test_mapped_decorator_serde():
+    from airflow.decorators import task
+    from airflow.models.xcom_arg import XComArg
+    from airflow.serialization.serialized_objects import _XComRef
+
+    with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
+        op1 = BaseOperator(task_id="op1")
+        xcomarg = XComArg(op1, "my_key")
+
+        @task(retry_delay=30)
+        def x(arg1, arg2, arg3, arg4):
+            print(arg1, arg2, arg3, arg4)
+
+        x.partial("foo", arg3=[1, 2, {"a": "b"}]).map({"a": 1, "b": 2}, arg4=xcomarg)
+
+    original = dag.get_task("x")
+
+    serialized = SerializedBaseOperator._serialize(original)
+    assert serialized == {
+        '_is_dummy': False,
+        '_is_mapped': True,
+        '_task_module': 'airflow.decorators.python',
+        '_task_type': '_PythonDecoratedOperator',
+        'downstream_task_ids': [],
+        'partial_kwargs': {
+            'op_args': ["foo"],
+            'op_kwargs': {'arg3': [1, 2, {"__type": "dict", "__var": {'a': 'b'}}]},
+            'retry_delay': 30,
+        },
+        'mapped_kwargs': {
+            'op_args': [{"__type": "dict", "__var": {'a': 1, 'b': 2}}],
+            'op_kwargs': {'arg4': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'my_key'}}},
+        },
+        'task_id': 'x',
+        'template_ext': [],
+        'template_fields': ['op_args', 'op_kwargs'],
+    }
+
+    deserialized = SerializedBaseOperator.deserialize_operator(serialized)
+    assert isinstance(deserialized, MappedOperator)
+    assert deserialized.deps is MappedOperator.DEFAULT_DEPS
+
+    assert deserialized.mapped_kwargs == {
+        "op_args": [{"a": 1, "b": 2}],
+        "op_kwargs": {"arg4": _XComRef("op1", "my_key")},
+    }
+    assert deserialized.partial_kwargs == {
+        "retry_delay": 30,
+        "op_args": ["foo"],
+        "op_kwargs": {"arg3": [1, 2, {"a": "b"}]},
+    }
+

Review comment:
       > `start_date` for instance.
   
   Hm some refactoring would be called for to extract the logic out of BaseOperator for reuse. (Note that this affects non-decorator MappedOperator as well.) I think this should be done in a separate PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21328: Rewrite decorated task mapping

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#discussion_r802460441



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1654,6 +1654,59 @@ def test_mapped_operator_xcomarg_serde():
     assert xcom_arg.operator is serialized_dag.task_dict['op1']
 
 
+def test_mapped_decorator_serde():
+    from airflow.decorators import task
+    from airflow.models.xcom_arg import XComArg
+    from airflow.serialization.serialized_objects import _XComRef
+
+    with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
+        op1 = BaseOperator(task_id="op1")
+        xcomarg = XComArg(op1, "my_key")
+
+        @task(retry_delay=30)
+        def x(arg1, arg2, arg3, arg4):
+            print(arg1, arg2, arg3, arg4)
+
+        x.partial("foo", arg3=[1, 2, {"a": "b"}]).map({"a": 1, "b": 2}, arg4=xcomarg)
+
+    original = dag.get_task("x")
+
+    serialized = SerializedBaseOperator._serialize(original)
+    assert serialized == {
+        '_is_dummy': False,
+        '_is_mapped': True,
+        '_task_module': 'airflow.decorators.python',
+        '_task_type': '_PythonDecoratedOperator',
+        'downstream_task_ids': [],
+        'partial_kwargs': {
+            'op_args': ["foo"],
+            'op_kwargs': {'arg3': [1, 2, {"__type": "dict", "__var": {'a': 'b'}}]},
+            'retry_delay': 30,
+        },
+        'mapped_kwargs': {
+            'op_args': [{"__type": "dict", "__var": {'a': 1, 'b': 2}}],
+            'op_kwargs': {'arg4': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'my_key'}}},
+        },
+        'task_id': 'x',
+        'template_ext': [],
+        'template_fields': ['op_args', 'op_kwargs'],
+    }
+
+    deserialized = SerializedBaseOperator.deserialize_operator(serialized)
+    assert isinstance(deserialized, MappedOperator)
+    assert deserialized.deps is MappedOperator.DEFAULT_DEPS
+
+    assert deserialized.mapped_kwargs == {
+        "op_args": [{"a": 1, "b": 2}],
+        "op_kwargs": {"arg4": _XComRef("op1", "my_key")},
+    }
+    assert deserialized.partial_kwargs == {
+        "retry_delay": 30,
+        "op_args": ["foo"],
+        "op_kwargs": {"arg3": [1, 2, {"a": "b"}]},
+    }
+

Review comment:
       ```suggestion
   
       assert deserialized.retry_delay == timedelta(seconds=30)
   ```
   
   (give or take.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #21328: Rewrite decorated task mapping

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#issuecomment-1033535186


   Serialisation format is revised. The KeyError is also fixed by implementing slightly smarter logic to merge `op_kwargs` from `mapped_kwargs` and `partial_kwargs`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #21328: Rewrite decorated task mapping

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#issuecomment-1031573948


   The serialization needs some work for this approach. Here is an incomplete test:
   
   ```python
   def test_mapped_decorator_serde():
       from airflow.models.xcom_arg import XComArg
       from airflow.decorators import task
   
       with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
           task1 = BaseOperator(task_id="op1")
           xcomarg = XComArg(task1, "test_key")
   
           @task(retry_delay=30)
           def x(arg1, arg2):
               ...
   
       real_op = x.partial(arg1=1).map(arg2=xcomarg).operator
   
       serialized = SerializedBaseOperator._serialize(real_op)
   
       assert serialized == {
           '_is_dummy': False,
           '_is_mapped': True,
           '_task_module': '???',
           '_task_type': '???',
           'downstream_task_ids': [],
           'mapped_kwargs': {
               'arg1': [
                   1,
                   2,
                   {"__type": "dict", "__var": {'a': 'b'}},
               ],
               'arg2': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'test_key'}},
           },
           'task_id': 'x',
           # We don't want to include the python source code in the serialized representation
           # TODO? Where does `retry_delay` go? We need to separate 
       }
   
       op = SerializedBaseOperator.deserialize_operator(serialized)
       assert isinstance(op, MappedOperator)
       assert op.deps is MappedOperator.DEFAULT_DEPS
   
       # TODO: add some more asserts here
   ```
   
   Trying to call `real_op.unmap()` also throws a key error:
   
   
   ```
   tests/serialization/test_dag_serialization.py:1670: in test_mapped_decorator_serde
       real_op.unmap()
   airflow/models/baseoperator.py:1882: in unmap
       dag._remove_task(self.task_id)
   airflow/models/dag.py:2167: in _remove_task
       task = self.task_dict.pop(task_id)
   E   KeyError: 'x'
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21328: Rewrite decorated task mapping

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#discussion_r802663673



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1654,6 +1654,59 @@ def test_mapped_operator_xcomarg_serde():
     assert xcom_arg.operator is serialized_dag.task_dict['op1']
 
 
+def test_mapped_decorator_serde():
+    from airflow.decorators import task
+    from airflow.models.xcom_arg import XComArg
+    from airflow.serialization.serialized_objects import _XComRef
+
+    with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
+        op1 = BaseOperator(task_id="op1")
+        xcomarg = XComArg(op1, "my_key")
+
+        @task(retry_delay=30)
+        def x(arg1, arg2, arg3, arg4):
+            print(arg1, arg2, arg3, arg4)
+
+        x.partial("foo", arg3=[1, 2, {"a": "b"}]).map({"a": 1, "b": 2}, arg4=xcomarg)
+
+    original = dag.get_task("x")
+
+    serialized = SerializedBaseOperator._serialize(original)
+    assert serialized == {
+        '_is_dummy': False,
+        '_is_mapped': True,
+        '_task_module': 'airflow.decorators.python',
+        '_task_type': '_PythonDecoratedOperator',
+        'downstream_task_ids': [],
+        'partial_kwargs': {
+            'op_args': ["foo"],
+            'op_kwargs': {'arg3': [1, 2, {"__type": "dict", "__var": {'a': 'b'}}]},
+            'retry_delay': 30,
+        },
+        'mapped_kwargs': {
+            'op_args': [{"__type": "dict", "__var": {'a': 1, 'b': 2}}],
+            'op_kwargs': {'arg4': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'my_key'}}},
+        },
+        'task_id': 'x',
+        'template_ext': [],
+        'template_fields': ['op_args', 'op_kwargs'],
+    }
+
+    deserialized = SerializedBaseOperator.deserialize_operator(serialized)
+    assert isinstance(deserialized, MappedOperator)
+    assert deserialized.deps is MappedOperator.DEFAULT_DEPS
+
+    assert deserialized.mapped_kwargs == {
+        "op_args": [{"a": 1, "b": 2}],
+        "op_kwargs": {"arg4": _XComRef("op1", "my_key")},
+    }
+    assert deserialized.partial_kwargs == {
+        "retry_delay": 30,
+        "op_args": ["foo"],
+        "op_kwargs": {"arg3": [1, 2, {"a": "b"}]},
+    }
+

Review comment:
       `start_date` for instance.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21328: Rewrite decorated task mapping

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#discussion_r802713213



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1654,6 +1654,59 @@ def test_mapped_operator_xcomarg_serde():
     assert xcom_arg.operator is serialized_dag.task_dict['op1']
 
 
+def test_mapped_decorator_serde():
+    from airflow.decorators import task
+    from airflow.models.xcom_arg import XComArg
+    from airflow.serialization.serialized_objects import _XComRef
+
+    with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
+        op1 = BaseOperator(task_id="op1")
+        xcomarg = XComArg(op1, "my_key")
+
+        @task(retry_delay=30)
+        def x(arg1, arg2, arg3, arg4):
+            print(arg1, arg2, arg3, arg4)
+
+        x.partial("foo", arg3=[1, 2, {"a": "b"}]).map({"a": 1, "b": 2}, arg4=xcomarg)
+
+    original = dag.get_task("x")
+
+    serialized = SerializedBaseOperator._serialize(original)
+    assert serialized == {
+        '_is_dummy': False,
+        '_is_mapped': True,
+        '_task_module': 'airflow.decorators.python',
+        '_task_type': '_PythonDecoratedOperator',
+        'downstream_task_ids': [],
+        'partial_kwargs': {
+            'op_args': ["foo"],
+            'op_kwargs': {'arg3': [1, 2, {"__type": "dict", "__var": {'a': 'b'}}]},
+            'retry_delay': 30,
+        },
+        'mapped_kwargs': {
+            'op_args': [{"__type": "dict", "__var": {'a': 1, 'b': 2}}],
+            'op_kwargs': {'arg4': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'my_key'}}},
+        },
+        'task_id': 'x',
+        'template_ext': [],
+        'template_fields': ['op_args', 'op_kwargs'],
+    }
+
+    deserialized = SerializedBaseOperator.deserialize_operator(serialized)
+    assert isinstance(deserialized, MappedOperator)
+    assert deserialized.deps is MappedOperator.DEFAULT_DEPS
+
+    assert deserialized.mapped_kwargs == {
+        "op_args": [{"a": 1, "b": 2}],
+        "op_kwargs": {"arg4": _XComRef("op1", "my_key")},
+    }
+    assert deserialized.partial_kwargs == {
+        "retry_delay": 30,
+        "op_args": ["foo"],
+        "op_kwargs": {"arg3": [1, 2, {"a": "b"}]},
+    }
+

Review comment:
       I added a test in `tests/decorators/test_python.py` for this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #21328: Rewrite decorated task mapping

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#discussion_r802659706



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1654,6 +1654,59 @@ def test_mapped_operator_xcomarg_serde():
     assert xcom_arg.operator is serialized_dag.task_dict['op1']
 
 
+def test_mapped_decorator_serde():
+    from airflow.decorators import task
+    from airflow.models.xcom_arg import XComArg
+    from airflow.serialization.serialized_objects import _XComRef
+
+    with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
+        op1 = BaseOperator(task_id="op1")
+        xcomarg = XComArg(op1, "my_key")
+
+        @task(retry_delay=30)
+        def x(arg1, arg2, arg3, arg4):
+            print(arg1, arg2, arg3, arg4)
+
+        x.partial("foo", arg3=[1, 2, {"a": "b"}]).map({"a": 1, "b": 2}, arg4=xcomarg)
+
+    original = dag.get_task("x")
+
+    serialized = SerializedBaseOperator._serialize(original)
+    assert serialized == {
+        '_is_dummy': False,
+        '_is_mapped': True,
+        '_task_module': 'airflow.decorators.python',
+        '_task_type': '_PythonDecoratedOperator',
+        'downstream_task_ids': [],
+        'partial_kwargs': {
+            'op_args': ["foo"],
+            'op_kwargs': {'arg3': [1, 2, {"__type": "dict", "__var": {'a': 'b'}}]},
+            'retry_delay': 30,
+        },
+        'mapped_kwargs': {
+            'op_args': [{"__type": "dict", "__var": {'a': 1, 'b': 2}}],
+            'op_kwargs': {'arg4': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'my_key'}}},
+        },
+        'task_id': 'x',
+        'template_ext': [],
+        'template_fields': ['op_args', 'op_kwargs'],
+    }
+
+    deserialized = SerializedBaseOperator.deserialize_operator(serialized)
+    assert isinstance(deserialized, MappedOperator)
+    assert deserialized.deps is MappedOperator.DEFAULT_DEPS
+
+    assert deserialized.mapped_kwargs == {
+        "op_args": [{"a": 1, "b": 2}],
+        "op_kwargs": {"arg4": _XComRef("op1", "my_key")},
+    }
+    assert deserialized.partial_kwargs == {
+        "retry_delay": 30,
+        "op_args": ["foo"],
+        "op_kwargs": {"arg3": [1, 2, {"a": "b"}]},
+    }
+

Review comment:
       It should be deserialized as a timedelta -- it's fine for _this_ value since the constructor for BaseOperator handles it, but other variables might behave differently




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb edited a comment on pull request #21328: Rewrite decorated task mapping

Posted by GitBox <gi...@apache.org>.
ashb edited a comment on pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#issuecomment-1031573948


   The serialization needs some work for this approach. Here is an incomplete test:
   
   ```python
   def test_mapped_decorator_serde():
       from airflow.models.xcom_arg import XComArg
       from airflow.decorators import task
   
       with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
           task1 = BaseOperator(task_id="op1")
           xcomarg = XComArg(task1, "test_key")
   
           @task(retry_delay=30)
           def x(arg1, arg2):
               ...
   
       real_op = x.partial(arg1=1).map(arg2=xcomarg).operator
   
       serialized = SerializedBaseOperator._serialize(real_op)
   
       assert serialized == {
           '_is_dummy': False,
           '_is_mapped': True,
           '_task_module': 'airflow.decorators.python',
           '_task_type': '_PythonDecoratedOperator',
           'downstream_task_ids': [],
           'partial_kwargs': {
               'multiple_outputs': False,
               'op_args': [],
               'op_kwargs': {
                   'arg1': [
                       1,
                       2,
                       {"__type": "dict", "__var": {'a': 'b'}},
                   ],
               },
               'retry_delay': 30,
           },
           'mapped_kwargs': {
               'op_args': [],
               # We don't need the __type/__var here!
               'op_kwargs': {
                   'arg2': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'test_key'}},
               }
           },
           'task_id': 'x',
           'template_ext': [],
           'template_fields': ['op_args', 'op_kwargs'],
           # We don't want to include the python source code in the serialized representation
           # TODO? Where does `retry_delay` go? We need to separate 
       }
   
   
       op = SerializedBaseOperator.deserialize_operator(serialized)
       assert isinstance(op, MappedOperator)
       assert op.deps is MappedOperator.DEFAULT_DEPS
   
       # TODO: add some more asserts here
   ```
   
   The serialization
   
   Trying to call `real_op.unmap()` also throws a key error:
   
   
   ```
   tests/serialization/test_dag_serialization.py:1670: in test_mapped_decorator_serde
       real_op.unmap()
   airflow/models/baseoperator.py:1882: in unmap
       dag._remove_task(self.task_id)
   airflow/models/dag.py:2167: in _remove_task
       task = self.task_dict.pop(task_id)
   E   KeyError: 'x'
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #21328: Rewrite decorated task mapping

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #21328:
URL: https://github.com/apache/airflow/pull/21328#issuecomment-1034026888


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org