You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/13 22:57:10 UTC

[GitHub] [airflow] ashb opened a new pull request, #22975: Resolve XComArgs before trying to unmap MappedOperators

ashb opened a new pull request, #22975:
URL: https://github.com/apache/airflow/pull/22975

   Many operators do some type validation inside `__init__`
   (DateTimeSensor for instance -- which requires a str or a datetime)
   which then fail when mapped as they get an XComArg instead.
   
   To fix this we have had to change the order we unmap and resolve
   templates:
   
   - first we get the unmapping kwargs, we resolve expansion/mapping args
     in that
   - Then we create the operator (this should fix the constructor getting
     XComArg problem)
   - Then we render templates, but only for values that _weren't_ expanded
     already
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22975: Resolve XComArg's before trying to unmap MappedOperators

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22975:
URL: https://github.com/apache/airflow/pull/22975#discussion_r849653586


##########
airflow/models/xcom_arg.py:
##########
@@ -136,12 +139,15 @@ def set_downstream(
         """Proxy to underlying operator set_downstream method. Required by TaskMixin."""
         self.operator.set_downstream(task_or_task_list, edge_modifier)
 
-    def resolve(self, context: Context) -> Any:
+    @provide_session
+    def resolve(self, context: Context, session: "Session" = NEW_SESSION) -> Any:

Review Comment:
   What interface issues might that be?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #22975: Resolve XComArg's before trying to unmap MappedOperators

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22975:
URL: https://github.com/apache/airflow/pull/22975#discussion_r849728476


##########
airflow/models/xcom_arg.py:
##########
@@ -136,12 +139,15 @@ def set_downstream(
         """Proxy to underlying operator set_downstream method. Required by TaskMixin."""
         self.operator.set_downstream(task_or_task_list, edge_modifier)
 
-    def resolve(self, context: Context) -> Any:
+    @provide_session
+    def resolve(self, context: Context, session: "Session" = NEW_SESSION) -> Any:

Review Comment:
   Don’t quite remember now, maybe it has something to do with DagParam also has `resolve`. I guess we can do it and see what happens.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #22975: Resolve XComArg's before trying to unmap MappedOperators

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22975:
URL: https://github.com/apache/airflow/pull/22975#issuecomment-1098320932

   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #22975: Resolve XComArg's before trying to unmap MappedOperators

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22975:
URL: https://github.com/apache/airflow/pull/22975#discussion_r849632316


##########
airflow/models/mappedoperator.py:
##########
@@ -686,33 +693,40 @@ def render_template_fields(
         """
         if not jinja_env:
             jinja_env = self.get_template_env()
-        unmapped_task = self.unmap()
+        # Before we unmap we have to resolve the mapped arguments, otherwise the real operator constructor
+        # could be called with an XComArg, rather than the value it resolves to.
+        #
+        # We also need to resolve _all_ mapped arguments, even if they aren't marked as templated
+        kwargs = self._get_unmap_kwargs()
+
+        template_fields = set(self.template_fields)
+
+        self._resolve_expansion_kwargs(kwargs, template_fields, context, session)
+
+        unmapped_task = self.unmap(rendered_kwargs=kwargs)
         self._do_render_template_fields(
             parent=unmapped_task,
-            template_fields=unmapped_task.template_fields,
+            template_fields=template_fields,

Review Comment:
   I hope `template_fields` being unordered won’t cause bugs in user code.



##########
airflow/models/mappedoperator.py:
##########
@@ -670,10 +675,12 @@ def prepare_for_execution(self) -> "MappedOperator":
         # we don't need to create a copy of the MappedOperator here.
         return self
 
+    @provide_session
     def render_template_fields(
         self,
         context: Context,
         jinja_env: Optional["jinja2.Environment"] = None,
+        session: Session = NEW_SESSION,

Review Comment:
   We might not be able to do this because `render_template_fields` is public API that can be overriden. I think `@provide_session` can be put on `_resolve_expansion_kwargs` to avoid the problem.



##########
airflow/models/mappedoperator.py:
##########
@@ -686,33 +693,40 @@ def render_template_fields(
         """
         if not jinja_env:
             jinja_env = self.get_template_env()
-        unmapped_task = self.unmap()
+        # Before we unmap we have to resolve the mapped arguments, otherwise the real operator constructor
+        # could be called with an XComArg, rather than the value it resolves to.
+        #
+        # We also need to resolve _all_ mapped arguments, even if they aren't marked as templated
+        kwargs = self._get_unmap_kwargs()
+
+        template_fields = set(self.template_fields)
+
+        self._resolve_expansion_kwargs(kwargs, template_fields, context, session)
+
+        unmapped_task = self.unmap(rendered_kwargs=kwargs)
         self._do_render_template_fields(
             parent=unmapped_task,
-            template_fields=unmapped_task.template_fields,
+            template_fields=template_fields,
             context=context,
             jinja_env=jinja_env,
             seen_oids=set(),
         )
         return unmapped_task
 
-    def _render_template_field(

Review Comment:
   I think we can remove this in AbstractOperator now; the sole reason of this function’s existence if for MappedOperator to hook into.



##########
airflow/models/mappedoperator.py:
##########
@@ -569,6 +573,7 @@ def _get_map_lengths(self, run_id: str, *, session: Session) -> Dict[str, int]:
                 map_lengths[mapped_arg_name] += length
         return map_lengths
 
+    @cache
     def _resolve_map_lengths(self, run_id: str, *, session: Session) -> Dict[str, int]:

Review Comment:
   It’ll be difficult to actually hit this cache due to `session` :(



##########
airflow/models/mappedoperator.py:
##########
@@ -484,14 +486,16 @@ def _get_unmap_kwargs(self) -> Dict[str, Any]:
             **self.mapped_kwargs,
         }
 
-    def unmap(self) -> "BaseOperator":
+    def unmap(self, rendered_kwargs: Optional[Dict[str, Any]] = None) -> "BaseOperator":

Review Comment:
   Probably should document `rendered_kwargs` (and maybe it’s better to call it `unmap_kwargs` for consistency?)



##########
airflow/models/xcom_arg.py:
##########
@@ -136,12 +139,15 @@ def set_downstream(
         """Proxy to underlying operator set_downstream method. Required by TaskMixin."""
         self.operator.set_downstream(task_or_task_list, edge_modifier)
 
-    def resolve(self, context: Context) -> Any:
+    @provide_session
+    def resolve(self, context: Context, session: "Session" = NEW_SESSION) -> Any:

Review Comment:
   IIRC I wondered if we could do this previously, but eventually did not due to interface compatibility issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #22975: Resolve XComArg's before trying to unmap MappedOperators

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22975:
URL: https://github.com/apache/airflow/pull/22975#discussion_r849684409


##########
airflow/models/mappedoperator.py:
##########
@@ -686,33 +693,40 @@ def render_template_fields(
         """
         if not jinja_env:
             jinja_env = self.get_template_env()
-        unmapped_task = self.unmap()
+        # Before we unmap we have to resolve the mapped arguments, otherwise the real operator constructor
+        # could be called with an XComArg, rather than the value it resolves to.
+        #
+        # We also need to resolve _all_ mapped arguments, even if they aren't marked as templated
+        kwargs = self._get_unmap_kwargs()
+
+        template_fields = set(self.template_fields)
+
+        self._resolve_expansion_kwargs(kwargs, template_fields, context, session)
+
+        unmapped_task = self.unmap(rendered_kwargs=kwargs)
         self._do_render_template_fields(
             parent=unmapped_task,
-            template_fields=unmapped_task.template_fields,
+            template_fields=template_fields,
             context=context,
             jinja_env=jinja_env,
             seen_oids=set(),
         )
         return unmapped_task
 
-    def _render_template_field(

Review Comment:
   Oh missed that. Cool.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22975: Resolve XComArg's before trying to unmap MappedOperators

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22975:
URL: https://github.com/apache/airflow/pull/22975#discussion_r849685251


##########
airflow/models/mappedoperator.py:
##########
@@ -670,10 +675,12 @@ def prepare_for_execution(self) -> "MappedOperator":
         # we don't need to create a copy of the MappedOperator here.
         return self
 
+    @provide_session
     def render_template_fields(
         self,
         context: Context,
         jinja_env: Optional["jinja2.Environment"] = None,
+        session: Session = NEW_SESSION,

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #22975: Resolve XComArgs before trying to unmap MappedOperators

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #22975:
URL: https://github.com/apache/airflow/pull/22975#discussion_r849889616


##########
airflow/models/mappedoperator.py:
##########
@@ -484,14 +486,26 @@ def _get_unmap_kwargs(self) -> Dict[str, Any]:
             **self.mapped_kwargs,
         }
 
-    def unmap(self) -> "BaseOperator":
-        """Get the "normal" Operator after applying the current mapping."""
+    def unmap(self, unmap_kwargs: Optional[Dict[str, Any]] = None) -> "BaseOperator":
+        """
+        Get the "normal" Operator after applying the current mapping.
+
+        If ``operator_class`` is not a class (i.e. this DAG has been deserialized) then this will return a
+        SeriSerializedBaseOperator that aims to "look like" the real operator.

Review Comment:
   ```suggestion
           SerializedBaseOperator that aims to "look like" the real operator.
   ```



##########
docs/apache-airflow/concepts/dynamic-task-mapping.rst:
##########
@@ -224,6 +224,42 @@ Currently it is only possible to map against a dict, a list, or one of those typ
 
 If an upstream task returns an unmappable type, the mapped task will fail at run-time with an ``UnmappableXComTypePushed`` exception. For instance, you can't have the upstream task return a plain string – it must be a list or a dict.
 
+How to templated fields and mapped arguments interact?

Review Comment:
   ```suggestion
   How do templated fields and mapped arguments interact?
   ```



##########
airflow/decorators/base.py:
##########
@@ -441,13 +443,38 @@ def _get_unmap_kwargs(self) -> Dict[str, Any]:
             **self.mapped_kwargs,
         }
 
-    def _expand_mapped_field(self, key: str, content: Any, context: Context, *, session: Session) -> Any:
-        if key != "op_kwargs" or not isinstance(content, collections.abc.Mapping):
-            return content
-        # The magic super() doesn't work here, so we use the explicit form.
-        # Not using super(..., self) to work around pyupgrade bug.
-        sup: Any = super(DecoratedMappedOperator, DecoratedMappedOperator)
-        return {k: sup._expand_mapped_field(self, k, v, context, session=session) for k, v in content.items()}
+    def _resolve_expansion_kwargs(
+        self, kwargs: Dict[str, Any], template_fields: Set[str], context: Context, session: "Session"
+    ) -> None:
+        expansion_kwargs = self._get_expansion_kwargs()
+
+        self._already_resolved_op_kwargs = set()
+        for k, v in expansion_kwargs.items():
+            if isinstance(v, XComArg):
+                self._already_resolved_op_kwargs.add(k)
+                v = v.resolve(context, session=session)
+            v = self._expand_mapped_field(k, v, context, session=session)
+            kwargs['op_kwargs'][k] = v
+            template_fields.discard(k)
+
+    def render_template(
+        self,
+        value: Any,
+        context: Context,
+        jinja_env: Optional["jinja2.Environment"] = None,
+        seen_oids: Optional[Set] = None,
+    ) -> Any:
+        if hasattr(self, '_combined_op_kwargs') and value is self._combined_op_kwargs:
+            # Avoid rendering values that came oout of resolved XComArgs

Review Comment:
   ```suggestion
               # Avoid rendering values that came out of resolved XComArgs
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham closed pull request #22975: Resolve XComArgs before trying to unmap MappedOperators

Posted by GitBox <gi...@apache.org>.
jedcunningham closed pull request #22975: Resolve XComArgs before trying to unmap MappedOperators
URL: https://github.com/apache/airflow/pull/22975


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22975: Resolve XComArg's before trying to unmap MappedOperators

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22975:
URL: https://github.com/apache/airflow/pull/22975#discussion_r849685509


##########
airflow/models/mappedoperator.py:
##########
@@ -484,14 +486,16 @@ def _get_unmap_kwargs(self) -> Dict[str, Any]:
             **self.mapped_kwargs,
         }
 
-    def unmap(self) -> "BaseOperator":
+    def unmap(self, rendered_kwargs: Optional[Dict[str, Any]] = None) -> "BaseOperator":

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22975: Resolve XComArg's before trying to unmap MappedOperators

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22975:
URL: https://github.com/apache/airflow/pull/22975#discussion_r849652248


##########
airflow/models/mappedoperator.py:
##########
@@ -670,10 +675,12 @@ def prepare_for_execution(self) -> "MappedOperator":
         # we don't need to create a copy of the MappedOperator here.
         return self
 
+    @provide_session
     def render_template_fields(
         self,
         context: Context,
         jinja_env: Optional["jinja2.Environment"] = None,
+        session: Session = NEW_SESSION,

Review Comment:
   Oh probably yeah. (It's a mess. I don't like that `@provide_session` expunges all. Add it to the list to fix)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22975: Resolve XComArg's before trying to unmap MappedOperators

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22975:
URL: https://github.com/apache/airflow/pull/22975#discussion_r849659308


##########
airflow/models/mappedoperator.py:
##########
@@ -686,33 +693,40 @@ def render_template_fields(
         """
         if not jinja_env:
             jinja_env = self.get_template_env()
-        unmapped_task = self.unmap()
+        # Before we unmap we have to resolve the mapped arguments, otherwise the real operator constructor
+        # could be called with an XComArg, rather than the value it resolves to.
+        #
+        # We also need to resolve _all_ mapped arguments, even if they aren't marked as templated
+        kwargs = self._get_unmap_kwargs()
+
+        template_fields = set(self.template_fields)
+
+        self._resolve_expansion_kwargs(kwargs, template_fields, context, session)
+
+        unmapped_task = self.unmap(rendered_kwargs=kwargs)
         self._do_render_template_fields(
             parent=unmapped_task,
-            template_fields=unmapped_task.template_fields,
+            template_fields=template_fields,
             context=context,
             jinja_env=jinja_env,
             seen_oids=set(),
         )
         return unmapped_task
 
-    def _render_template_field(

Review Comment:
   Do you mean https://github.com/apache/airflow/pull/22975/files#diff-f373d874912ccfa03918e853ad15aa91d6bfaa1ee75f1676f78c8a756f332ed0L355



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham merged pull request #22975: Resolve XComArgs before trying to unmap MappedOperators

Posted by GitBox <gi...@apache.org>.
jedcunningham merged PR #22975:
URL: https://github.com/apache/airflow/pull/22975


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22975: Resolve XComArg's before trying to unmap MappedOperators

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22975:
URL: https://github.com/apache/airflow/pull/22975#discussion_r849657477


##########
airflow/models/mappedoperator.py:
##########
@@ -569,6 +573,7 @@ def _get_map_lengths(self, run_id: str, *, session: Session) -> Dict[str, int]:
                 map_lengths[mapped_arg_name] += length
         return map_lengths
 
+    @cache
     def _resolve_map_lengths(self, run_id: str, *, session: Session) -> Dict[str, int]:

Review Comment:
   ```python-console
   In [2]: @provide_session
      ...: def a(session):
      ...:     print(id(session))
      ...:     b()
   
   In [4]: @provide_session
      ...: def b(session):
      ...:     print(id(session))
   
   In [5]: a()
   140533469520240
   140533469520240
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22975: Resolve XComArg's before trying to unmap MappedOperators

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22975:
URL: https://github.com/apache/airflow/pull/22975#discussion_r849653144


##########
airflow/models/mappedoperator.py:
##########
@@ -569,6 +573,7 @@ def _get_map_lengths(self, run_id: str, *, session: Session) -> Dict[str, int]:
                 map_lengths[mapped_arg_name] += length
         return map_lengths
 
+    @cache
     def _resolve_map_lengths(self, run_id: str, *, session: Session) -> Dict[str, int]:

Review Comment:
   Session has a longer life time than you might expect due to SQLA's Pooling



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org