You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/18 18:08:37 UTC

[GitHub] [airflow] ashb opened a new pull request #20931: Set dependencies in MappedOperator via XComArgs

ashb opened a new pull request #20931:
URL: https://github.com/apache/airflow/pull/20931


   Set upstream dependencies when an XComArg is used in a MappedOperator, and (de)serialize them correctly.
   
   We can only re-create XComArg at the DAG level as we need to get hold of the Operator (not just a task_id) so we need a two phase approach here: when deserializing operators we create them as a place-holder class (`_XcomRef`) and then "up a level" when deserializing the DAG we turn these back in to XComArg objects.
   
   (And in so doing we needed to fix a bug or two in serializing MappedOperator that have a DAG -- it caused a recursion error.)
   
   This PR could possibly be split in to two (one to set deps, a second to serialize them.
   
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #20931: Set dependencies in MappedOperator via XComArgs

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #20931:
URL: https://github.com/apache/airflow/pull/20931#discussion_r787576941



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -970,6 +997,14 @@ def deserialize_dag(cls, encoded_dag: Dict[str, Any]) -> 'SerializedDAG':
             if serializable_task.subdag is not None:
                 setattr(serializable_task.subdag, 'parent_dag', dag)
 
+            if isinstance(task, MappedOperator):
+                for d in (task.mapped_kwargs, task.partial_kwargs):
+                    for k in d:
+                        if not isinstance(d[k], _XcomRef):
+                            continue
+
+                        d[k] = XComArg(operator=dag.get_task(d[k].task_id), key=d[k].key)

Review comment:
       ```suggestion
                       for k, v in d.items():
                           if not isinstance(v, _XcomRef):
                               continue
   
                           d[k] = XComArg(operator=dag.get_task(v.task_id), key=v.key)
   ```
   Wondering if this would be easier to read this way




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20931: Set dependencies in MappedOperator via XComArgs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20931:
URL: https://github.com/apache/airflow/pull/20931#discussion_r788393469



##########
File path: airflow/models/xcom_arg.py
##########
@@ -59,9 +61,9 @@ class XComArg(DependencyMixin):
     :type key: str
     """
 
-    def __init__(self, operator: Union[BaseOperator, MappedOperator], key: str = XCOM_RETURN_KEY):
-        self._operator = operator
-        self._key = key
+    def __init__(self, operator: "Union[BaseOperator, MappedOperator]", key: str = XCOM_RETURN_KEY):

Review comment:
       After #20945 we can probably change this to `Operator`, which would not cause import cycles.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20931: Set dependencies in MappedOperator via XComArgs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20931:
URL: https://github.com/apache/airflow/pull/20931#discussion_r787673127



##########
File path: airflow/models/xcom_arg.py
##########
@@ -59,9 +61,9 @@ class XComArg(DependencyMixin):
     :type key: str
     """
 
-    def __init__(self, operator: Union[BaseOperator, MappedOperator], key: str = XCOM_RETURN_KEY):
-        self._operator = operator
-        self._key = key
+    def __init__(self, operator: "Union[BaseOperator, MappedOperator]", key: str = XCOM_RETURN_KEY):

Review comment:
       To avoid uncessary imports/reduce chance import cycles.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20931: Set dependencies in MappedOperator via XComArgs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20931:
URL: https://github.com/apache/airflow/pull/20931#discussion_r787564685



##########
File path: airflow/models/baseoperator.py
##########
@@ -1734,6 +1721,8 @@ def __repr__(self) -> str:
     params: Union[ParamsDict, dict] = attr.ib(factory=ParamsDict)
     template_fields: Iterable[str] = attr.ib()
 
+    subdag: None = attr.ib(init=False)

Review comment:
       Is this used?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20931: Set dependencies in MappedOperator via XComArgs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20931:
URL: https://github.com/apache/airflow/pull/20931#discussion_r787569762



##########
File path: airflow/models/baseoperator.py
##########
@@ -1734,6 +1721,8 @@ def __repr__(self) -> str:
     params: Union[ParamsDict, dict] = attr.ib(factory=ParamsDict)
     template_fields: Iterable[str] = attr.ib()
 
+    subdag: None = attr.ib(init=False)

Review comment:
       May be fixed/not needed by your #20945?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20931: Set dependencies in MappedOperator via XComArgs

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20931:
URL: https://github.com/apache/airflow/pull/20931#discussion_r787608798



##########
File path: airflow/serialization/enums.py
##########
@@ -48,3 +48,4 @@ class DagAttributeTypes(str, Enum):
     TASK_GROUP = 'taskgroup'
     EDGE_INFO = 'edgeinfo'
     PARAM = 'param'
+    XCOM_REF = 'xcom_ref'

Review comment:
       nit -- no strong opinion
   
   ```suggestion
       XCOM_REF = 'xcomref'
   ```
   
   to have same convention as the other enum items




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20931: Set dependencies in MappedOperator via XComArgs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20931:
URL: https://github.com/apache/airflow/pull/20931#discussion_r787673672



##########
File path: airflow/serialization/enums.py
##########
@@ -48,3 +48,4 @@ class DagAttributeTypes(str, Enum):
     TASK_GROUP = 'taskgroup'
     EDGE_INFO = 'edgeinfo'
     PARAM = 'param'
+    XCOM_REF = 'xcom_ref'

Review comment:
       Good point -- better to be consistent.

##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1590,14 +1593,58 @@ def test_mapped_operator_serde():
                 {"__type": "dict", "__var": {'a': 'b'}},
             ]
         },
-        'partial_kwargs': {},
+        'partial_kwargs': {
+            'executor_config': {
+                '__type': 'dict',
+                '__var': {
+                    'dict': {"__type": "dict", "__var": {'sub': 'value'}},
+                },
+            },
+        },
         'task_id': 'a',
         'template_fields': ['bash_command', 'env'],
     }
 
     op = SerializedBaseOperator.deserialize_operator(serialized)
+    assert isinstance(op, MappedOperator)
 
     assert op.operator_class == "airflow.operators.bash.BashOperator"
+    assert op.mapped_kwargs['bash_command'] == literal
+    assert op.partial_kwargs['executor_config'] == {'dict': {'sub': 'value'}}
+
+
+def test_mapped_operator_xcomarg_serde():
+    from airflow.models.xcom_arg import XComArg
+
+    with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
+        task1 = BaseOperator(task_id="op1")
+        xcomarg = XComArg(task1, "test_key")
+        mapped = MockOperator(task_id='task_2').map(arg2=xcomarg)
+
+    serialized = SerializedBaseOperator._serialize(mapped)
+    assert serialized == {
+        '_is_dummy': False,
+        '_is_mapped': True,
+        '_task_module': 'tests.test_utils.mock_operators',
+        '_task_type': 'MockOperator',
+        'downstream_task_ids': [],
+        'mapped_kwargs': {'arg2': {'__type': 'xcom_ref', '__var': {'task_id': 'op1', 'key': 'test_key'}}},

Review comment:
       ```suggestion
           'mapped_kwargs': {'arg2': {'__type': 'xcomref', '__var': {'task_id': 'op1', 'key': 'test_key'}}},
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20931: Set dependencies in MappedOperator via XComArgs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20931:
URL: https://github.com/apache/airflow/pull/20931#discussion_r787578055



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -970,6 +997,14 @@ def deserialize_dag(cls, encoded_dag: Dict[str, Any]) -> 'SerializedDAG':
             if serializable_task.subdag is not None:
                 setattr(serializable_task.subdag, 'parent_dag', dag)
 
+            if isinstance(task, MappedOperator):
+                for d in (task.mapped_kwargs, task.partial_kwargs):
+                    for k in d:
+                        if not isinstance(d[k], _XcomRef):
+                            continue
+
+                        d[k] = XComArg(operator=dag.get_task(d[k].task_id), key=d[k].key)

Review comment:
       Oh yes, probably. Last night I was thinking we can't change the dict in place, but that's only the case for adding/removing items, but changing existing ones will be fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20931: Set dependencies in MappedOperator via XComArgs

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20931:
URL: https://github.com/apache/airflow/pull/20931#discussion_r787568519



##########
File path: airflow/models/baseoperator.py
##########
@@ -1734,6 +1721,8 @@ def __repr__(self) -> str:
     params: Union[ParamsDict, dict] = attr.ib(factory=ParamsDict)
     template_fields: Iterable[str] = attr.ib()
 
+    subdag: None = attr.ib(init=False)

Review comment:
       "badly" as part of the serialization code https://github.com/apache/airflow/blob/14a057ff921d7e6ceb70326f5fecac29d3a093ad/airflow/serialization/serialized_objects.py#L970-L971




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #20931: Set dependencies in MappedOperator via XComArgs

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #20931:
URL: https://github.com/apache/airflow/pull/20931#issuecomment-1016283230


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20931: Set dependencies in MappedOperator via XComArgs

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20931:
URL: https://github.com/apache/airflow/pull/20931#discussion_r787606435



##########
File path: airflow/models/xcom_arg.py
##########
@@ -59,9 +61,9 @@ class XComArg(DependencyMixin):
     :type key: str
     """
 
-    def __init__(self, operator: Union[BaseOperator, MappedOperator], key: str = XCOM_RETURN_KEY):
-        self._operator = operator
-        self._key = key
+    def __init__(self, operator: "Union[BaseOperator, MappedOperator]", key: str = XCOM_RETURN_KEY):

Review comment:
       ```suggestion
       def __init__(self, operator: Union[BaseOperator, MappedOperator], key: str = XCOM_RETURN_KEY):
   ```
   
   Any reason for this change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr merged pull request #20931: Set dependencies in MappedOperator via XComArgs

Posted by GitBox <gi...@apache.org>.
uranusjr merged pull request #20931:
URL: https://github.com/apache/airflow/pull/20931


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org