You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "uranusjr (via GitHub)" <gi...@apache.org> on 2023/03/10 10:37:42 UTC

[GitHub] [airflow] uranusjr commented on a diff in pull request #29913: Fix mapped tasks partial arguments when DAG default args are provided

uranusjr commented on code in PR #29913:
URL: https://github.com/apache/airflow/pull/29913#discussion_r1132213509


##########
airflow/models/baseoperator.py:
##########
@@ -240,72 +240,123 @@ def partial(
         task_id = task_group.child_id(task_id)
 
     # Merge DAG and task group level defaults into user-supplied values.
-    partial_kwargs, partial_params = get_merged_defaults(
+    default_partial_kwargs, partial_params = get_merged_defaults(
         dag=dag,
         task_group=task_group,
         task_params=params,
         task_default_args=kwargs.pop("default_args", None),
     )
-    partial_kwargs.update(kwargs)
-
-    # Always fully populate partial kwargs to exclude them from map().
-    partial_kwargs.setdefault("dag", dag)
-    partial_kwargs.setdefault("task_group", task_group)
-    partial_kwargs.setdefault("task_id", task_id)
-    partial_kwargs.setdefault("start_date", start_date)
-    partial_kwargs.setdefault("end_date", end_date)
-    partial_kwargs.setdefault("owner", owner)
-    partial_kwargs.setdefault("email", email)
-    partial_kwargs.setdefault("trigger_rule", trigger_rule)
-    partial_kwargs.setdefault("depends_on_past", depends_on_past)
-    partial_kwargs.setdefault("ignore_first_depends_on_past", ignore_first_depends_on_past)
-    partial_kwargs.setdefault("wait_for_past_depends_before_skipping", wait_for_past_depends_before_skipping)
-    partial_kwargs.setdefault("wait_for_downstream", wait_for_downstream)
-    partial_kwargs.setdefault("retries", retries)
-    partial_kwargs.setdefault("queue", queue)
-    partial_kwargs.setdefault("pool", pool)
-    partial_kwargs.setdefault("pool_slots", pool_slots)
-    partial_kwargs.setdefault("execution_timeout", execution_timeout)
-    partial_kwargs.setdefault("max_retry_delay", max_retry_delay)
-    partial_kwargs.setdefault("retry_delay", retry_delay)
-    partial_kwargs.setdefault("retry_exponential_backoff", retry_exponential_backoff)
-    partial_kwargs.setdefault("priority_weight", priority_weight)
-    partial_kwargs.setdefault("weight_rule", weight_rule)
-    partial_kwargs.setdefault("sla", sla)
-    partial_kwargs.setdefault("max_active_tis_per_dag", max_active_tis_per_dag)
-    partial_kwargs.setdefault("on_execute_callback", on_execute_callback)
-    partial_kwargs.setdefault("on_failure_callback", on_failure_callback)
-    partial_kwargs.setdefault("on_retry_callback", on_retry_callback)
-    partial_kwargs.setdefault("on_success_callback", on_success_callback)
-    partial_kwargs.setdefault("run_as_user", run_as_user)
-    partial_kwargs.setdefault("executor_config", executor_config)
-    partial_kwargs.setdefault("inlets", inlets or [])
-    partial_kwargs.setdefault("outlets", outlets or [])
-    partial_kwargs.setdefault("resources", resources)
-    partial_kwargs.setdefault("doc", doc)
-    partial_kwargs.setdefault("doc_json", doc_json)
-    partial_kwargs.setdefault("doc_md", doc_md)
-    partial_kwargs.setdefault("doc_rst", doc_rst)
-    partial_kwargs.setdefault("doc_yaml", doc_yaml)
+
+    # Create partial_kwargs from args and kwargs
+    partial_kwargs = {
+        **kwargs,
+        "dag": dag,
+        "task_group": task_group,
+        "task_id": task_id,
+        "start_date": start_date,
+        "end_date": end_date,
+        "owner": owner,
+        "email": email,
+        "trigger_rule": trigger_rule,
+        "depends_on_past": depends_on_past,
+        "ignore_first_depends_on_past": ignore_first_depends_on_past,
+        "wait_for_past_depends_before_skipping": wait_for_past_depends_before_skipping,
+        "wait_for_downstream": wait_for_downstream,
+        "retries": retries,
+        "queue": queue,
+        "pool": pool,
+        "pool_slots": pool_slots,
+        "execution_timeout": execution_timeout,
+        "max_retry_delay": max_retry_delay,
+        "retry_delay": retry_delay,
+        "retry_exponential_backoff": retry_exponential_backoff,
+        "priority_weight": priority_weight,
+        "weight_rule": weight_rule,
+        "sla": sla,
+        "max_active_tis_per_dag": max_active_tis_per_dag,
+        "on_execute_callback": on_execute_callback,
+        "on_failure_callback": on_failure_callback,
+        "on_retry_callback": on_retry_callback,
+        "on_success_callback": on_success_callback,
+        "run_as_user": run_as_user,
+        "executor_config": executor_config,
+        "inlets": inlets,
+        "outlets": outlets,
+        "resources": resources,
+        "doc": doc,
+        "doc_json": doc_json,
+        "doc_md": doc_md,
+        "doc_rst": doc_rst,
+        "doc_yaml": doc_yaml,
+    }
+
+    # Override None kwargs by dag default values
+    for k, v in default_partial_kwargs.items():
+        if partial_kwargs.get(k) is None:
+            partial_kwargs[k] = v
+
+    # Override None kwargs which don't have a dag default value by Airflow default value
+    partial_kwargs["owner"] = partial_kwargs["owner"] or DEFAULT_OWNER

Review Comment:
   These are fine but I wonder if this can be done better, for example with a dict of defaults and a loop like this:
   
   ```python
   partial_kwargs = {
       v if v is not None else DEFAULT_VALUES.get(k, v)
       for k, v in partiak_kwargs.items()
   }
   ```
   
   And would `ArgNotSet` better as a sentinel than `None`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org