You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/13 20:56:52 UTC

[GitHub] [airflow] jedcunningham commented on a diff in pull request #22975: Resolve XComArgs before trying to unmap MappedOperators

jedcunningham commented on code in PR #22975:
URL: https://github.com/apache/airflow/pull/22975#discussion_r849889616


##########
airflow/models/mappedoperator.py:
##########
@@ -484,14 +486,26 @@ def _get_unmap_kwargs(self) -> Dict[str, Any]:
             **self.mapped_kwargs,
         }
 
-    def unmap(self) -> "BaseOperator":
-        """Get the "normal" Operator after applying the current mapping."""
+    def unmap(self, unmap_kwargs: Optional[Dict[str, Any]] = None) -> "BaseOperator":
+        """
+        Get the "normal" Operator after applying the current mapping.
+
+        If ``operator_class`` is not a class (i.e. this DAG has been deserialized) then this will return a
+        SeriSerializedBaseOperator that aims to "look like" the real operator.

Review Comment:
   ```suggestion
           SerializedBaseOperator that aims to "look like" the real operator.
   ```



##########
docs/apache-airflow/concepts/dynamic-task-mapping.rst:
##########
@@ -224,6 +224,42 @@ Currently it is only possible to map against a dict, a list, or one of those typ
 
 If an upstream task returns an unmappable type, the mapped task will fail at run-time with an ``UnmappableXComTypePushed`` exception. For instance, you can't have the upstream task return a plain string – it must be a list or a dict.
 
+How to templated fields and mapped arguments interact?

Review Comment:
   ```suggestion
   How do templated fields and mapped arguments interact?
   ```



##########
airflow/decorators/base.py:
##########
@@ -441,13 +443,38 @@ def _get_unmap_kwargs(self) -> Dict[str, Any]:
             **self.mapped_kwargs,
         }
 
-    def _expand_mapped_field(self, key: str, content: Any, context: Context, *, session: Session) -> Any:
-        if key != "op_kwargs" or not isinstance(content, collections.abc.Mapping):
-            return content
-        # The magic super() doesn't work here, so we use the explicit form.
-        # Not using super(..., self) to work around pyupgrade bug.
-        sup: Any = super(DecoratedMappedOperator, DecoratedMappedOperator)
-        return {k: sup._expand_mapped_field(self, k, v, context, session=session) for k, v in content.items()}
+    def _resolve_expansion_kwargs(
+        self, kwargs: Dict[str, Any], template_fields: Set[str], context: Context, session: "Session"
+    ) -> None:
+        expansion_kwargs = self._get_expansion_kwargs()
+
+        self._already_resolved_op_kwargs = set()
+        for k, v in expansion_kwargs.items():
+            if isinstance(v, XComArg):
+                self._already_resolved_op_kwargs.add(k)
+                v = v.resolve(context, session=session)
+            v = self._expand_mapped_field(k, v, context, session=session)
+            kwargs['op_kwargs'][k] = v
+            template_fields.discard(k)
+
+    def render_template(
+        self,
+        value: Any,
+        context: Context,
+        jinja_env: Optional["jinja2.Environment"] = None,
+        seen_oids: Optional[Set] = None,
+    ) -> Any:
+        if hasattr(self, '_combined_op_kwargs') and value is self._combined_op_kwargs:
+            # Avoid rendering values that came oout of resolved XComArgs

Review Comment:
   ```suggestion
               # Avoid rendering values that came out of resolved XComArgs
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org