You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/01 08:19:33 UTC

[GitHub] [airflow] uranusjr opened a new pull request, #26100: Split out and handle 'params' in mapped operator

uranusjr opened a new pull request, #26100:
URL: https://github.com/apache/airflow/pull/26100

   This should allow using `params` in expanding kwargs. Tests to come, need to ensure the changes don’t break any existing functionalities.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26100: Split out and handle 'params' in mapped operator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26100:
URL: https://github.com/apache/airflow/pull/26100#discussion_r965338277


##########
airflow/models/mappedoperator.py:
##########
@@ -162,6 +165,7 @@ class OperatorPartial:
     """
 
     operator_class: Type["BaseOperator"]
+    params: Union[ParamsDict, dict]

Review Comment:
   To be able to expand a task against `params`, we need to be able to “merge” default DAG- and task-level params with the user-mapped params. So `params` is split out of other partial kwargs and treated specially. Partial kwargs should _never_ contain the `"params"` key.



##########
airflow/models/baseoperator.py:
##########
@@ -307,7 +306,11 @@ def partial(
     partial_kwargs["executor_config"] = partial_kwargs["executor_config"] or {}
     partial_kwargs["resources"] = coerce_resources(partial_kwargs["resources"])
 
-    return OperatorPartial(operator_class=operator_class, kwargs=partial_kwargs)
+    return OperatorPartial(
+        operator_class=operator_class,
+        kwargs=partial_kwargs,
+        params=partial_params,

Review Comment:
   … but pass them to this separate attribute.



##########
airflow/models/baseoperator.py:
##########
@@ -1183,17 +1186,17 @@ def render_template_fields(
         context: Context,
         jinja_env: Optional["jinja2.Environment"] = None,
     ) -> Optional["BaseOperator"]:
-        """Template all attributes listed in template_fields.
+        """Template all attributes listed in *self.template_fields*.
 
         This mutates the attributes in-place and is irreversible.
 
-        :param context: Dict with values to apply on content
-        :param jinja_env: Jinja environment
+        :param context: Context dict with values to apply on content.
+        :param jinja_env: Jinja environment to use for rendering.
         """
         if not jinja_env:
             jinja_env = self.get_template_env()
         self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
-        return self
+        return None

Review Comment:
   This does not change any behaviour, I only did it so the behaviour is easier to explain…



##########
airflow/models/taskinstance.py:
##########
@@ -2195,10 +2181,10 @@ def render_templates(self, context: Optional[Context] = None) -> "Operator":
         """
         if not context:
             context = self.get_template_context()
+        original_task = self.task
         rendered_task = self.task.render_template_fields(context)
-        if rendered_task is None:  # Compatibility -- custom renderer, assume unmapped.
-            return self.task
-        original_task, self.task = self.task, rendered_task
+        if rendered_task is not None:  # Mapped operator, assign unmapped task.
+            self.task = rendered_task
         return original_task

Review Comment:
   … here. Previously it’s difficult to follow when `rendered_task` is None and when it’s not; now it’s simple—a mapped operator (MappedOperator subclases) performs unmapping and returns the unmapped task, while a non-mapped operator (BaseOperator subclasses) return None because no unmapping is needed.



##########
airflow/models/baseoperator.py:
##########
@@ -256,7 +256,6 @@ def partial(
     partial_kwargs.setdefault("end_date", end_date)
     partial_kwargs.setdefault("owner", owner)
     partial_kwargs.setdefault("email", email)
-    partial_kwargs.setdefault("params", default_params)

Review Comment:
   So we don’t set the default params in `partial_kwargs`…



##########
airflow/decorators/base.py:
##########
@@ -363,7 +363,6 @@ def _expand(self, expand_input: ExpandInput, *, strict: bool) -> XComArg:
         task_id = get_unique_task_id(partial_kwargs.pop("task_id"), dag, task_group)
         if task_group:
             task_id = task_group.child_id(task_id)
-        params = partial_kwargs.pop("params", None) or default_params

Review Comment:
   And this line is deleted since it’s now no-op.



##########
airflow/models/mappedoperator.py:
##########
@@ -565,16 +568,24 @@ def _get_unmap_kwargs(self, mapped_kwargs: Mapping[str, Any], *, strict: bool) -
                 mapped_kwargs,
                 fail_reason="unmappable or already specified",
             )
-        # Ordering is significant; mapped kwargs should override partial ones.
+
+        # If params appears in the mapped kwargs, we need to merge it into the
+        # partial params, overriding existing keys.
+        params = copy.copy(self.params)
+        with contextlib.suppress(KeyError):
+            params.update(mapped_kwargs["params"])
+
+        # Ordering is significant; mapped kwargs should override partial ones,
+        # and the specially handled params should be respected.
         return {
             "task_id": self.task_id,
             "dag": self.dag,
             "task_group": self.task_group,
-            "params": self.params,
             "start_date": self.start_date,
             "end_date": self.end_date,
             **self.partial_kwargs,
             **mapped_kwargs,
+            "params": params,
         }

Review Comment:
   Actual merging happens here. The merged `params` is put last so it overrides the incomplete `mapped_kwargs["params"]`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr merged pull request #26100: Split out and handle 'params' in mapped operator

Posted by GitBox <gi...@apache.org>.
uranusjr merged PR #26100:
URL: https://github.com/apache/airflow/pull/26100


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #26100: Split out and handle 'params' in mapped operator

Posted by GitBox <gi...@apache.org>.
uranusjr commented on PR #26100:
URL: https://github.com/apache/airflow/pull/26100#issuecomment-1259212296

   This is going to conflict with #26702 and I want to get that one in first for 2.4.1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org