You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "hussein-awala (via GitHub)" <gi...@apache.org> on 2023/03/04 01:16:00 UTC

[GitHub] [airflow] hussein-awala opened a new pull request, #29913: Fix mapped tasks partial arguments when DAG default args are provided

hussein-awala opened a new pull request, #29913:
URL: https://github.com/apache/airflow/pull/29913

   closes: #29903
   
   ---
   Currently the `BaseOperator` overrides the mapped tasks partial arguments by DAG default args when they are provided regardless the value of partial arguments, but by definition, Airflow should uses these default args only when the operator argument is not provided.
   
   This PR fills the `partial_kwargs` dictionary with the arguments and kwargs of the partial method, and then just replaces the None values with the default arguments values.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29913: Fix mapped tasks partial arguments when DAG default args are provided

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29913:
URL: https://github.com/apache/airflow/pull/29913#discussion_r1165076256


##########
airflow/models/baseoperator.py:
##########
@@ -292,34 +312,11 @@ def partial(
         "doc_yaml": doc_yaml,
     }
 
-    DEFAULT_VALUES: dict[str, Any] = {
-        "owner": DEFAULT_OWNER,
-        "trigger_rule": DEFAULT_TRIGGER_RULE,
-        "depends_on_past": False,
-        "ignore_first_depends_on_past": DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
-        "wait_for_past_depends_before_skipping": DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING,
-        "wait_for_downstream": False,
-        "retries": DEFAULT_RETRIES,
-        "queue": DEFAULT_QUEUE,
-        "pool_slots": DEFAULT_POOL_SLOTS,
-        "execution_timeout": DEFAULT_TASK_EXECUTION_TIMEOUT,
-        "retry_delay": DEFAULT_RETRY_DELAY,
-        "retry_exponential_backoff": False,
-        "priority_weight": DEFAULT_PRIORITY_WEIGHT,
-        "weight_rule": DEFAULT_WEIGHT_RULE,
-        "inlets": [],
-        "outlets": [],
-    }
+    # Populate kwargs from DAG-level default args.
+    partial_kwargs.update((k, v) for k, v in dag_default_args.items() if partial_kwargs.get(k) is NOTSET)
 
-    # Override NOTSET kwargs by dag default values
-    for k, v in default_partial_kwargs.items():
-        if partial_kwargs.get(k) is NOTSET:
-            partial_kwargs[k] = v
-
-    # Override NOTSET kwargs which don't have a dag default value by Airflow default value or None
-    partial_kwargs = {
-        k: v if v is not NOTSET else DEFAULT_VALUES.get(k, None) for k, v in partial_kwargs.items()
-    }
+    # Populate kwargs from Airflow default constants.
+    partial_kwargs.update((k, v) for k, v in _PARTIAL_DEFAULTS.items() if partial_kwargs.get(k) is NOTSET)

Review Comment:
   Change this back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mpgreg commented on pull request #29913: Fix mapped tasks partial arguments when DAG default args are provided

Posted by "mpgreg (via GitHub)" <gi...@apache.org>.
mpgreg commented on PR #29913:
URL: https://github.com/apache/airflow/pull/29913#issuecomment-1663997456

   Is there any reason this fix wouldn't work with branch decorator also?
   
   I'm on `Astronomer Runtime 8.8.0 based on Airflow 2.6.3+astro.2` and branch to dynamic tasks
   
   
   ```
   @task.branch
       def check_object_count(doc_count:dict) -> str:
           response = blah
           
           if response 
               return None
           else:
               return ["get_md_docs", "get_rst_docs", "get_code_samples", "get_stack_overflow"]
   
   @task(trigger_rule='none_failed')
       def get_md_docs(source:dict):
            return blah
   @task(trigger_rule='none_failed')
       def get_rst_docs(source:dict):
            return blah
   @task(trigger_rule='none_failed')
       def get_code_samples(source:dict):
            return blah
   @task(trigger_rule='none_failed')
       def get_stack_overflow(source:dict):
            return blah
   
   md_docs = get_md_docs.partial().expand(source=markdown_docs_sources)
   rst_docs = get_rst_docs.partial().expand(source=rst_docs_sources)
   code_samples = get_code_samples.partial().expand(source=code_samples_sources)
   stackoverflow_md = get_stack_overflow.partial().expand(tag=stackoverflow_tags)
   ```
   
   All downstream mapped tasks fail with: 
   ```
   Traceback (most recent call last):
     File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1407, in _run_raw_task
       self._execute_task_with_callbacks(context, test_mode)
     File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1531, in _execute_task_with_callbacks
       task_orig = self.render_templates(context=context)
     File "/usr/local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2179, in render_templates
       original_task.render_template_fields(context)
     File "/usr/local/lib/python3.10/site-packages/airflow/models/mappedoperator.py", line 692, in render_template_fields
       unmapped_task = self.unmap(mapped_kwargs)
     File "/usr/local/lib/python3.10/site-packages/airflow/models/mappedoperator.py", line 615, in unmap
       op = self.operator_class(**kwargs, _airflow_from_mapped=True)
     File "/usr/local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 429, in apply_defaults
       result = func(self, **kwargs, default_args=default_args)
     File "/usr/local/lib/python3.10/site-packages/airflow/decorators/python.py", line 49, in __init__
       super().__init__(
     File "/usr/local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 429, in apply_defaults
       result = func(self, **kwargs, default_args=default_args)
     File "/usr/local/lib/python3.10/site-packages/airflow/decorators/base.py", line 212, in __init__
       super().__init__(task_id=task_id, **kwargs_to_upstream, **kwargs)
     File "/usr/local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 429, in apply_defaults
       result = func(self, **kwargs, default_args=default_args)
     File "/usr/local/lib/python3.10/site-packages/airflow/operators/python.py", line 166, in __init__
       super().__init__(**kwargs)
     File "/usr/local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 429, in apply_defaults
       result = func(self, **kwargs, default_args=default_args)
     File "/usr/local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 788, in __init__
       raise AirflowException(
   airflow.exceptions.AirflowException: Invalid arguments were passed to _PythonDecoratedOperator (task_id: get_md_docs__1). Invalid arguments were:
   **kwargs: {'postgres_conn_id': 'postgres_myid'}
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on a diff in pull request #29913: Fix mapped tasks partial arguments when DAG default args are provided

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on code in PR #29913:
URL: https://github.com/apache/airflow/pull/29913#discussion_r1133009140


##########
airflow/models/baseoperator.py:
##########
@@ -240,72 +240,123 @@ def partial(
         task_id = task_group.child_id(task_id)
 
     # Merge DAG and task group level defaults into user-supplied values.
-    partial_kwargs, partial_params = get_merged_defaults(
+    default_partial_kwargs, partial_params = get_merged_defaults(
         dag=dag,
         task_group=task_group,
         task_params=params,
         task_default_args=kwargs.pop("default_args", None),
     )
-    partial_kwargs.update(kwargs)
-
-    # Always fully populate partial kwargs to exclude them from map().
-    partial_kwargs.setdefault("dag", dag)
-    partial_kwargs.setdefault("task_group", task_group)
-    partial_kwargs.setdefault("task_id", task_id)
-    partial_kwargs.setdefault("start_date", start_date)
-    partial_kwargs.setdefault("end_date", end_date)
-    partial_kwargs.setdefault("owner", owner)
-    partial_kwargs.setdefault("email", email)
-    partial_kwargs.setdefault("trigger_rule", trigger_rule)
-    partial_kwargs.setdefault("depends_on_past", depends_on_past)
-    partial_kwargs.setdefault("ignore_first_depends_on_past", ignore_first_depends_on_past)
-    partial_kwargs.setdefault("wait_for_past_depends_before_skipping", wait_for_past_depends_before_skipping)
-    partial_kwargs.setdefault("wait_for_downstream", wait_for_downstream)
-    partial_kwargs.setdefault("retries", retries)
-    partial_kwargs.setdefault("queue", queue)
-    partial_kwargs.setdefault("pool", pool)
-    partial_kwargs.setdefault("pool_slots", pool_slots)
-    partial_kwargs.setdefault("execution_timeout", execution_timeout)
-    partial_kwargs.setdefault("max_retry_delay", max_retry_delay)
-    partial_kwargs.setdefault("retry_delay", retry_delay)
-    partial_kwargs.setdefault("retry_exponential_backoff", retry_exponential_backoff)
-    partial_kwargs.setdefault("priority_weight", priority_weight)
-    partial_kwargs.setdefault("weight_rule", weight_rule)
-    partial_kwargs.setdefault("sla", sla)
-    partial_kwargs.setdefault("max_active_tis_per_dag", max_active_tis_per_dag)
-    partial_kwargs.setdefault("on_execute_callback", on_execute_callback)
-    partial_kwargs.setdefault("on_failure_callback", on_failure_callback)
-    partial_kwargs.setdefault("on_retry_callback", on_retry_callback)
-    partial_kwargs.setdefault("on_success_callback", on_success_callback)
-    partial_kwargs.setdefault("run_as_user", run_as_user)
-    partial_kwargs.setdefault("executor_config", executor_config)
-    partial_kwargs.setdefault("inlets", inlets or [])
-    partial_kwargs.setdefault("outlets", outlets or [])
-    partial_kwargs.setdefault("resources", resources)
-    partial_kwargs.setdefault("doc", doc)
-    partial_kwargs.setdefault("doc_json", doc_json)
-    partial_kwargs.setdefault("doc_md", doc_md)
-    partial_kwargs.setdefault("doc_rst", doc_rst)
-    partial_kwargs.setdefault("doc_yaml", doc_yaml)
+
+    # Create partial_kwargs from args and kwargs
+    partial_kwargs = {
+        **kwargs,
+        "dag": dag,
+        "task_group": task_group,
+        "task_id": task_id,
+        "start_date": start_date,
+        "end_date": end_date,
+        "owner": owner,
+        "email": email,
+        "trigger_rule": trigger_rule,
+        "depends_on_past": depends_on_past,
+        "ignore_first_depends_on_past": ignore_first_depends_on_past,
+        "wait_for_past_depends_before_skipping": wait_for_past_depends_before_skipping,
+        "wait_for_downstream": wait_for_downstream,
+        "retries": retries,
+        "queue": queue,
+        "pool": pool,
+        "pool_slots": pool_slots,
+        "execution_timeout": execution_timeout,
+        "max_retry_delay": max_retry_delay,
+        "retry_delay": retry_delay,
+        "retry_exponential_backoff": retry_exponential_backoff,
+        "priority_weight": priority_weight,
+        "weight_rule": weight_rule,
+        "sla": sla,
+        "max_active_tis_per_dag": max_active_tis_per_dag,
+        "on_execute_callback": on_execute_callback,
+        "on_failure_callback": on_failure_callback,
+        "on_retry_callback": on_retry_callback,
+        "on_success_callback": on_success_callback,
+        "run_as_user": run_as_user,
+        "executor_config": executor_config,
+        "inlets": inlets,
+        "outlets": outlets,
+        "resources": resources,
+        "doc": doc,
+        "doc_json": doc_json,
+        "doc_md": doc_md,
+        "doc_rst": doc_rst,
+        "doc_yaml": doc_yaml,
+    }
+
+    # Override None kwargs by dag default values
+    for k, v in default_partial_kwargs.items():
+        if partial_kwargs.get(k) is None:
+            partial_kwargs[k] = v
+
+    # Override None kwargs which don't have a dag default value by Airflow default value
+    partial_kwargs["owner"] = partial_kwargs["owner"] or DEFAULT_OWNER

Review Comment:
   Great Idea, I will implement it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29913: Fix mapped tasks partial arguments when DAG default args are provided

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29913:
URL: https://github.com/apache/airflow/pull/29913#discussion_r1133452840


##########
airflow/models/baseoperator.py:
##########
@@ -190,42 +191,42 @@ def partial(
     task_id: str,
     dag: DAG | None = None,
     task_group: TaskGroup | None = None,
-    start_date: datetime | None = None,
-    end_date: datetime | None = None,
-    owner: str = DEFAULT_OWNER,
-    email: None | str | Iterable[str] = None,
+    start_date: datetime | None | ArgNotSet = NOTSET,
+    end_date: datetime | None | ArgNotSet = NOTSET,
+    owner: str | None | ArgNotSet = NOTSET,

Review Comment:
   I don’t think the None case is possible here? (Also `trigger_rule` and maybe more below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29913: Fix mapped tasks partial arguments when DAG default args are provided

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29913:
URL: https://github.com/apache/airflow/pull/29913#discussion_r1161986623


##########
airflow/models/baseoperator.py:
##########
@@ -292,34 +312,11 @@ def partial(
         "doc_yaml": doc_yaml,
     }
 
-    DEFAULT_VALUES: dict[str, Any] = {
-        "owner": DEFAULT_OWNER,
-        "trigger_rule": DEFAULT_TRIGGER_RULE,
-        "depends_on_past": False,
-        "ignore_first_depends_on_past": DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
-        "wait_for_past_depends_before_skipping": DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING,
-        "wait_for_downstream": False,
-        "retries": DEFAULT_RETRIES,
-        "queue": DEFAULT_QUEUE,
-        "pool_slots": DEFAULT_POOL_SLOTS,
-        "execution_timeout": DEFAULT_TASK_EXECUTION_TIMEOUT,
-        "retry_delay": DEFAULT_RETRY_DELAY,
-        "retry_exponential_backoff": False,
-        "priority_weight": DEFAULT_PRIORITY_WEIGHT,
-        "weight_rule": DEFAULT_WEIGHT_RULE,
-        "inlets": [],
-        "outlets": [],
-    }
+    # Populate kwargs from DAG-level default args.
+    partial_kwargs.update((k, v) for k, v in dag_default_args.items() if partial_kwargs.get(k) is NOTSET)
 
-    # Override NOTSET kwargs by dag default values
-    for k, v in default_partial_kwargs.items():
-        if partial_kwargs.get(k) is NOTSET:
-            partial_kwargs[k] = v
-
-    # Override NOTSET kwargs which don't have a dag default value by Airflow default value or None
-    partial_kwargs = {
-        k: v if v is not NOTSET else DEFAULT_VALUES.get(k, None) for k, v in partial_kwargs.items()
-    }
+    # Populate kwargs from Airflow default constants.
+    partial_kwargs.update((k, v) for k, v in _PARTIAL_DEFAULTS.items() if partial_kwargs.get(k) is NOTSET)

Review Comment:
   > the previous version was replacing the the value `NOTSET` by `None` if it doesn't have a default value in `DEFAULT_VALUES`
   
   Ah OK, I misunderstood how the default values are organised. Will update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on a diff in pull request #29913: Fix mapped tasks partial arguments when DAG default args are provided

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on code in PR #29913:
URL: https://github.com/apache/airflow/pull/29913#discussion_r1154923921


##########
airflow/models/baseoperator.py:
##########
@@ -190,42 +191,42 @@ def partial(
     task_id: str,
     dag: DAG | None = None,
     task_group: TaskGroup | None = None,
-    start_date: datetime | None = None,
-    end_date: datetime | None = None,
-    owner: str = DEFAULT_OWNER,
-    email: None | str | Iterable[str] = None,
+    start_date: datetime | None | ArgNotSet = NOTSET,
+    end_date: datetime | None | ArgNotSet = NOTSET,
+    owner: str | None | ArgNotSet = NOTSET,

Review Comment:
   @uranusjr can you check it now?
   
   I used `BaseOperator.__init__` as reference to know if the argument can be None or not, and I used `NOTSET` for all the arguments except `dag`, `task_group` and `params` which are not overridden by default_partial_kwargs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala merged pull request #29913: Fix mapped tasks partial arguments when DAG default args are provided

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala merged PR #29913:
URL: https://github.com/apache/airflow/pull/29913


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29913: Fix mapped tasks partial arguments when DAG default args are provided

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29913:
URL: https://github.com/apache/airflow/pull/29913#discussion_r1132213843


##########
airflow/models/baseoperator.py:
##########
@@ -240,72 +240,123 @@ def partial(
         task_id = task_group.child_id(task_id)
 
     # Merge DAG and task group level defaults into user-supplied values.
-    partial_kwargs, partial_params = get_merged_defaults(
+    default_partial_kwargs, partial_params = get_merged_defaults(
         dag=dag,
         task_group=task_group,
         task_params=params,
         task_default_args=kwargs.pop("default_args", None),
     )
-    partial_kwargs.update(kwargs)
-
-    # Always fully populate partial kwargs to exclude them from map().
-    partial_kwargs.setdefault("dag", dag)
-    partial_kwargs.setdefault("task_group", task_group)
-    partial_kwargs.setdefault("task_id", task_id)
-    partial_kwargs.setdefault("start_date", start_date)
-    partial_kwargs.setdefault("end_date", end_date)
-    partial_kwargs.setdefault("owner", owner)
-    partial_kwargs.setdefault("email", email)
-    partial_kwargs.setdefault("trigger_rule", trigger_rule)
-    partial_kwargs.setdefault("depends_on_past", depends_on_past)
-    partial_kwargs.setdefault("ignore_first_depends_on_past", ignore_first_depends_on_past)
-    partial_kwargs.setdefault("wait_for_past_depends_before_skipping", wait_for_past_depends_before_skipping)
-    partial_kwargs.setdefault("wait_for_downstream", wait_for_downstream)
-    partial_kwargs.setdefault("retries", retries)
-    partial_kwargs.setdefault("queue", queue)
-    partial_kwargs.setdefault("pool", pool)
-    partial_kwargs.setdefault("pool_slots", pool_slots)
-    partial_kwargs.setdefault("execution_timeout", execution_timeout)
-    partial_kwargs.setdefault("max_retry_delay", max_retry_delay)
-    partial_kwargs.setdefault("retry_delay", retry_delay)
-    partial_kwargs.setdefault("retry_exponential_backoff", retry_exponential_backoff)
-    partial_kwargs.setdefault("priority_weight", priority_weight)
-    partial_kwargs.setdefault("weight_rule", weight_rule)
-    partial_kwargs.setdefault("sla", sla)
-    partial_kwargs.setdefault("max_active_tis_per_dag", max_active_tis_per_dag)
-    partial_kwargs.setdefault("on_execute_callback", on_execute_callback)
-    partial_kwargs.setdefault("on_failure_callback", on_failure_callback)
-    partial_kwargs.setdefault("on_retry_callback", on_retry_callback)
-    partial_kwargs.setdefault("on_success_callback", on_success_callback)
-    partial_kwargs.setdefault("run_as_user", run_as_user)
-    partial_kwargs.setdefault("executor_config", executor_config)
-    partial_kwargs.setdefault("inlets", inlets or [])
-    partial_kwargs.setdefault("outlets", outlets or [])
-    partial_kwargs.setdefault("resources", resources)
-    partial_kwargs.setdefault("doc", doc)
-    partial_kwargs.setdefault("doc_json", doc_json)
-    partial_kwargs.setdefault("doc_md", doc_md)
-    partial_kwargs.setdefault("doc_rst", doc_rst)
-    partial_kwargs.setdefault("doc_yaml", doc_yaml)
+
+    # Create partial_kwargs from args and kwargs
+    partial_kwargs = {
+        **kwargs,
+        "dag": dag,
+        "task_group": task_group,
+        "task_id": task_id,
+        "start_date": start_date,
+        "end_date": end_date,
+        "owner": owner,
+        "email": email,
+        "trigger_rule": trigger_rule,
+        "depends_on_past": depends_on_past,
+        "ignore_first_depends_on_past": ignore_first_depends_on_past,
+        "wait_for_past_depends_before_skipping": wait_for_past_depends_before_skipping,
+        "wait_for_downstream": wait_for_downstream,
+        "retries": retries,
+        "queue": queue,
+        "pool": pool,
+        "pool_slots": pool_slots,
+        "execution_timeout": execution_timeout,
+        "max_retry_delay": max_retry_delay,
+        "retry_delay": retry_delay,
+        "retry_exponential_backoff": retry_exponential_backoff,
+        "priority_weight": priority_weight,
+        "weight_rule": weight_rule,
+        "sla": sla,
+        "max_active_tis_per_dag": max_active_tis_per_dag,
+        "on_execute_callback": on_execute_callback,
+        "on_failure_callback": on_failure_callback,
+        "on_retry_callback": on_retry_callback,
+        "on_success_callback": on_success_callback,
+        "run_as_user": run_as_user,
+        "executor_config": executor_config,
+        "inlets": inlets,
+        "outlets": outlets,
+        "resources": resources,
+        "doc": doc,
+        "doc_json": doc_json,
+        "doc_md": doc_md,
+        "doc_rst": doc_rst,
+        "doc_yaml": doc_yaml,
+    }
+
+    # Override None kwargs by dag default values
+    for k, v in default_partial_kwargs.items():
+        if partial_kwargs.get(k) is None:
+            partial_kwargs[k] = v
+
+    # Override None kwargs which don't have a dag default value by Airflow default value
+    partial_kwargs["owner"] = partial_kwargs["owner"] or DEFAULT_OWNER
+    partial_kwargs["trigger_rule"] = partial_kwargs["trigger_rule"] or DEFAULT_TRIGGER_RULE
+    partial_kwargs["depends_on_past"] = (
+        partial_kwargs["depends_on_past"] if partial_kwargs["depends_on_past"] is not None else False
+    )
+    partial_kwargs["ignore_first_depends_on_past"] = (
+        partial_kwargs["ignore_first_depends_on_past"]
+        if partial_kwargs["ignore_first_depends_on_past"] is not None
+        else DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST
+    )
+    partial_kwargs["wait_for_past_depends_before_skipping"] = (
+        partial_kwargs["wait_for_past_depends_before_skipping"]
+        if partial_kwargs["wait_for_past_depends_before_skipping"] is not None
+        else DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING
+    )
+    partial_kwargs["wait_for_downstream"] = (
+        partial_kwargs["wait_for_downstream"] if partial_kwargs["wait_for_downstream"] is not None else False
+    )
+    partial_kwargs["retries"] = (
+        partial_kwargs["retries"] if partial_kwargs["retries"] is not None else DEFAULT_RETRIES
+    )
+    partial_kwargs["queue"] = partial_kwargs["queue"] or DEFAULT_QUEUE
+    partial_kwargs["pool_slots"] = (
+        partial_kwargs["pool_slots"] if partial_kwargs["pool_slots"] is not None else DEFAULT_POOL_SLOTS
+    )
+    partial_kwargs["execution_timeout"] = (
+        partial_kwargs["execution_timeout"] or DEFAULT_TASK_EXECUTION_TIMEOUT
+    )
+    partial_kwargs["retry_delay"] = (
+        partial_kwargs["retry_delay"] if partial_kwargs["retry_delay"] is not None else DEFAULT_RETRY_DELAY
+    )
+    partial_kwargs["retry_exponential_backoff"] = (
+        partial_kwargs["retry_exponential_backoff"] if partial_kwargs["retry_exponential_backoff"] else False
+    )
+    partial_kwargs["priority_weight"] = (
+        partial_kwargs["priority_weight"] if partial_kwargs["priority_weight"] else DEFAULT_PRIORITY_WEIGHT
+    )
+    partial_kwargs["weight_rule"] = partial_kwargs["weight_rule"] or DEFAULT_WEIGHT_RULE
+    partial_kwargs["inlets"] = partial_kwargs["inlets"] or []
+    partial_kwargs["outlets"] = partial_kwargs["outlets"] or []
 
     # Post-process arguments. Should be kept in sync with _TaskDecorator.expand().
     if "task_concurrency" in kwargs:  # Reject deprecated option.
         raise TypeError("unexpected argument: task_concurrency")
     if partial_kwargs["wait_for_downstream"]:
         partial_kwargs["depends_on_past"] = True
-    partial_kwargs["start_date"] = timezone.convert_to_utc(partial_kwargs["start_date"])
-    partial_kwargs["end_date"] = timezone.convert_to_utc(partial_kwargs["end_date"])
+    partial_kwargs["start_date"] = timezone.convert_to_utc(partial_kwargs["start_date"])  # type: ignore
+    partial_kwargs["end_date"] = timezone.convert_to_utc(partial_kwargs["end_date"])  # type: ignore

Review Comment:
   Why are these ignores needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on a diff in pull request #29913: Fix mapped tasks partial arguments when DAG default args are provided

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on code in PR #29913:
URL: https://github.com/apache/airflow/pull/29913#discussion_r1133009097


##########
airflow/models/baseoperator.py:
##########
@@ -240,72 +240,123 @@ def partial(
         task_id = task_group.child_id(task_id)
 
     # Merge DAG and task group level defaults into user-supplied values.
-    partial_kwargs, partial_params = get_merged_defaults(
+    default_partial_kwargs, partial_params = get_merged_defaults(
         dag=dag,
         task_group=task_group,
         task_params=params,
         task_default_args=kwargs.pop("default_args", None),
     )
-    partial_kwargs.update(kwargs)
-
-    # Always fully populate partial kwargs to exclude them from map().
-    partial_kwargs.setdefault("dag", dag)
-    partial_kwargs.setdefault("task_group", task_group)
-    partial_kwargs.setdefault("task_id", task_id)
-    partial_kwargs.setdefault("start_date", start_date)
-    partial_kwargs.setdefault("end_date", end_date)
-    partial_kwargs.setdefault("owner", owner)
-    partial_kwargs.setdefault("email", email)
-    partial_kwargs.setdefault("trigger_rule", trigger_rule)
-    partial_kwargs.setdefault("depends_on_past", depends_on_past)
-    partial_kwargs.setdefault("ignore_first_depends_on_past", ignore_first_depends_on_past)
-    partial_kwargs.setdefault("wait_for_past_depends_before_skipping", wait_for_past_depends_before_skipping)
-    partial_kwargs.setdefault("wait_for_downstream", wait_for_downstream)
-    partial_kwargs.setdefault("retries", retries)
-    partial_kwargs.setdefault("queue", queue)
-    partial_kwargs.setdefault("pool", pool)
-    partial_kwargs.setdefault("pool_slots", pool_slots)
-    partial_kwargs.setdefault("execution_timeout", execution_timeout)
-    partial_kwargs.setdefault("max_retry_delay", max_retry_delay)
-    partial_kwargs.setdefault("retry_delay", retry_delay)
-    partial_kwargs.setdefault("retry_exponential_backoff", retry_exponential_backoff)
-    partial_kwargs.setdefault("priority_weight", priority_weight)
-    partial_kwargs.setdefault("weight_rule", weight_rule)
-    partial_kwargs.setdefault("sla", sla)
-    partial_kwargs.setdefault("max_active_tis_per_dag", max_active_tis_per_dag)
-    partial_kwargs.setdefault("on_execute_callback", on_execute_callback)
-    partial_kwargs.setdefault("on_failure_callback", on_failure_callback)
-    partial_kwargs.setdefault("on_retry_callback", on_retry_callback)
-    partial_kwargs.setdefault("on_success_callback", on_success_callback)
-    partial_kwargs.setdefault("run_as_user", run_as_user)
-    partial_kwargs.setdefault("executor_config", executor_config)
-    partial_kwargs.setdefault("inlets", inlets or [])
-    partial_kwargs.setdefault("outlets", outlets or [])
-    partial_kwargs.setdefault("resources", resources)
-    partial_kwargs.setdefault("doc", doc)
-    partial_kwargs.setdefault("doc_json", doc_json)
-    partial_kwargs.setdefault("doc_md", doc_md)
-    partial_kwargs.setdefault("doc_rst", doc_rst)
-    partial_kwargs.setdefault("doc_yaml", doc_yaml)
+
+    # Create partial_kwargs from args and kwargs
+    partial_kwargs = {
+        **kwargs,
+        "dag": dag,
+        "task_group": task_group,
+        "task_id": task_id,
+        "start_date": start_date,
+        "end_date": end_date,
+        "owner": owner,
+        "email": email,
+        "trigger_rule": trigger_rule,
+        "depends_on_past": depends_on_past,
+        "ignore_first_depends_on_past": ignore_first_depends_on_past,
+        "wait_for_past_depends_before_skipping": wait_for_past_depends_before_skipping,
+        "wait_for_downstream": wait_for_downstream,
+        "retries": retries,
+        "queue": queue,
+        "pool": pool,
+        "pool_slots": pool_slots,
+        "execution_timeout": execution_timeout,
+        "max_retry_delay": max_retry_delay,
+        "retry_delay": retry_delay,
+        "retry_exponential_backoff": retry_exponential_backoff,
+        "priority_weight": priority_weight,
+        "weight_rule": weight_rule,
+        "sla": sla,
+        "max_active_tis_per_dag": max_active_tis_per_dag,
+        "on_execute_callback": on_execute_callback,
+        "on_failure_callback": on_failure_callback,
+        "on_retry_callback": on_retry_callback,
+        "on_success_callback": on_success_callback,
+        "run_as_user": run_as_user,
+        "executor_config": executor_config,
+        "inlets": inlets,
+        "outlets": outlets,
+        "resources": resources,
+        "doc": doc,
+        "doc_json": doc_json,
+        "doc_md": doc_md,
+        "doc_rst": doc_rst,
+        "doc_yaml": doc_yaml,
+    }
+
+    # Override None kwargs by dag default values
+    for k, v in default_partial_kwargs.items():
+        if partial_kwargs.get(k) is None:
+            partial_kwargs[k] = v
+
+    # Override None kwargs which don't have a dag default value by Airflow default value
+    partial_kwargs["owner"] = partial_kwargs["owner"] or DEFAULT_OWNER
+    partial_kwargs["trigger_rule"] = partial_kwargs["trigger_rule"] or DEFAULT_TRIGGER_RULE
+    partial_kwargs["depends_on_past"] = (
+        partial_kwargs["depends_on_past"] if partial_kwargs["depends_on_past"] is not None else False
+    )
+    partial_kwargs["ignore_first_depends_on_past"] = (
+        partial_kwargs["ignore_first_depends_on_past"]
+        if partial_kwargs["ignore_first_depends_on_past"] is not None
+        else DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST
+    )
+    partial_kwargs["wait_for_past_depends_before_skipping"] = (
+        partial_kwargs["wait_for_past_depends_before_skipping"]
+        if partial_kwargs["wait_for_past_depends_before_skipping"] is not None
+        else DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING
+    )
+    partial_kwargs["wait_for_downstream"] = (
+        partial_kwargs["wait_for_downstream"] if partial_kwargs["wait_for_downstream"] is not None else False
+    )
+    partial_kwargs["retries"] = (
+        partial_kwargs["retries"] if partial_kwargs["retries"] is not None else DEFAULT_RETRIES
+    )
+    partial_kwargs["queue"] = partial_kwargs["queue"] or DEFAULT_QUEUE
+    partial_kwargs["pool_slots"] = (
+        partial_kwargs["pool_slots"] if partial_kwargs["pool_slots"] is not None else DEFAULT_POOL_SLOTS
+    )
+    partial_kwargs["execution_timeout"] = (
+        partial_kwargs["execution_timeout"] or DEFAULT_TASK_EXECUTION_TIMEOUT
+    )
+    partial_kwargs["retry_delay"] = (
+        partial_kwargs["retry_delay"] if partial_kwargs["retry_delay"] is not None else DEFAULT_RETRY_DELAY
+    )
+    partial_kwargs["retry_exponential_backoff"] = (
+        partial_kwargs["retry_exponential_backoff"] if partial_kwargs["retry_exponential_backoff"] else False
+    )
+    partial_kwargs["priority_weight"] = (
+        partial_kwargs["priority_weight"] if partial_kwargs["priority_weight"] else DEFAULT_PRIORITY_WEIGHT
+    )
+    partial_kwargs["weight_rule"] = partial_kwargs["weight_rule"] or DEFAULT_WEIGHT_RULE
+    partial_kwargs["inlets"] = partial_kwargs["inlets"] or []
+    partial_kwargs["outlets"] = partial_kwargs["outlets"] or []
 
     # Post-process arguments. Should be kept in sync with _TaskDecorator.expand().
     if "task_concurrency" in kwargs:  # Reject deprecated option.
         raise TypeError("unexpected argument: task_concurrency")
     if partial_kwargs["wait_for_downstream"]:
         partial_kwargs["depends_on_past"] = True
-    partial_kwargs["start_date"] = timezone.convert_to_utc(partial_kwargs["start_date"])
-    partial_kwargs["end_date"] = timezone.convert_to_utc(partial_kwargs["end_date"])
+    partial_kwargs["start_date"] = timezone.convert_to_utc(partial_kwargs["start_date"])  # type: ignore
+    partial_kwargs["end_date"] = timezone.convert_to_utc(partial_kwargs["end_date"])  # type: ignore

Review Comment:
   Since I didn't defined the type of `partial_kwargs`, python auto detected it to Dict[str, <combination of all the values types>] which is not compatible with the used functions, so I fixed it by defining the type as Dict[str, Any]



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29913: Fix mapped tasks partial arguments when DAG default args are provided

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29913:
URL: https://github.com/apache/airflow/pull/29913#issuecomment-1454928157

   cc: @uranusjr . This definitely needs your insight :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mpgreg commented on pull request #29913: Fix mapped tasks partial arguments when DAG default args are provided

Posted by "mpgreg (via GitHub)" <gi...@apache.org>.
mpgreg commented on PR #29913:
URL: https://github.com/apache/airflow/pull/29913#issuecomment-1663999330

   If I remove the default_args it runs okay.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29913: Fix mapped tasks partial arguments when DAG default args are provided

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29913:
URL: https://github.com/apache/airflow/pull/29913#discussion_r1132213509


##########
airflow/models/baseoperator.py:
##########
@@ -240,72 +240,123 @@ def partial(
         task_id = task_group.child_id(task_id)
 
     # Merge DAG and task group level defaults into user-supplied values.
-    partial_kwargs, partial_params = get_merged_defaults(
+    default_partial_kwargs, partial_params = get_merged_defaults(
         dag=dag,
         task_group=task_group,
         task_params=params,
         task_default_args=kwargs.pop("default_args", None),
     )
-    partial_kwargs.update(kwargs)
-
-    # Always fully populate partial kwargs to exclude them from map().
-    partial_kwargs.setdefault("dag", dag)
-    partial_kwargs.setdefault("task_group", task_group)
-    partial_kwargs.setdefault("task_id", task_id)
-    partial_kwargs.setdefault("start_date", start_date)
-    partial_kwargs.setdefault("end_date", end_date)
-    partial_kwargs.setdefault("owner", owner)
-    partial_kwargs.setdefault("email", email)
-    partial_kwargs.setdefault("trigger_rule", trigger_rule)
-    partial_kwargs.setdefault("depends_on_past", depends_on_past)
-    partial_kwargs.setdefault("ignore_first_depends_on_past", ignore_first_depends_on_past)
-    partial_kwargs.setdefault("wait_for_past_depends_before_skipping", wait_for_past_depends_before_skipping)
-    partial_kwargs.setdefault("wait_for_downstream", wait_for_downstream)
-    partial_kwargs.setdefault("retries", retries)
-    partial_kwargs.setdefault("queue", queue)
-    partial_kwargs.setdefault("pool", pool)
-    partial_kwargs.setdefault("pool_slots", pool_slots)
-    partial_kwargs.setdefault("execution_timeout", execution_timeout)
-    partial_kwargs.setdefault("max_retry_delay", max_retry_delay)
-    partial_kwargs.setdefault("retry_delay", retry_delay)
-    partial_kwargs.setdefault("retry_exponential_backoff", retry_exponential_backoff)
-    partial_kwargs.setdefault("priority_weight", priority_weight)
-    partial_kwargs.setdefault("weight_rule", weight_rule)
-    partial_kwargs.setdefault("sla", sla)
-    partial_kwargs.setdefault("max_active_tis_per_dag", max_active_tis_per_dag)
-    partial_kwargs.setdefault("on_execute_callback", on_execute_callback)
-    partial_kwargs.setdefault("on_failure_callback", on_failure_callback)
-    partial_kwargs.setdefault("on_retry_callback", on_retry_callback)
-    partial_kwargs.setdefault("on_success_callback", on_success_callback)
-    partial_kwargs.setdefault("run_as_user", run_as_user)
-    partial_kwargs.setdefault("executor_config", executor_config)
-    partial_kwargs.setdefault("inlets", inlets or [])
-    partial_kwargs.setdefault("outlets", outlets or [])
-    partial_kwargs.setdefault("resources", resources)
-    partial_kwargs.setdefault("doc", doc)
-    partial_kwargs.setdefault("doc_json", doc_json)
-    partial_kwargs.setdefault("doc_md", doc_md)
-    partial_kwargs.setdefault("doc_rst", doc_rst)
-    partial_kwargs.setdefault("doc_yaml", doc_yaml)
+
+    # Create partial_kwargs from args and kwargs
+    partial_kwargs = {
+        **kwargs,
+        "dag": dag,
+        "task_group": task_group,
+        "task_id": task_id,
+        "start_date": start_date,
+        "end_date": end_date,
+        "owner": owner,
+        "email": email,
+        "trigger_rule": trigger_rule,
+        "depends_on_past": depends_on_past,
+        "ignore_first_depends_on_past": ignore_first_depends_on_past,
+        "wait_for_past_depends_before_skipping": wait_for_past_depends_before_skipping,
+        "wait_for_downstream": wait_for_downstream,
+        "retries": retries,
+        "queue": queue,
+        "pool": pool,
+        "pool_slots": pool_slots,
+        "execution_timeout": execution_timeout,
+        "max_retry_delay": max_retry_delay,
+        "retry_delay": retry_delay,
+        "retry_exponential_backoff": retry_exponential_backoff,
+        "priority_weight": priority_weight,
+        "weight_rule": weight_rule,
+        "sla": sla,
+        "max_active_tis_per_dag": max_active_tis_per_dag,
+        "on_execute_callback": on_execute_callback,
+        "on_failure_callback": on_failure_callback,
+        "on_retry_callback": on_retry_callback,
+        "on_success_callback": on_success_callback,
+        "run_as_user": run_as_user,
+        "executor_config": executor_config,
+        "inlets": inlets,
+        "outlets": outlets,
+        "resources": resources,
+        "doc": doc,
+        "doc_json": doc_json,
+        "doc_md": doc_md,
+        "doc_rst": doc_rst,
+        "doc_yaml": doc_yaml,
+    }
+
+    # Override None kwargs by dag default values
+    for k, v in default_partial_kwargs.items():
+        if partial_kwargs.get(k) is None:
+            partial_kwargs[k] = v
+
+    # Override None kwargs which don't have a dag default value by Airflow default value
+    partial_kwargs["owner"] = partial_kwargs["owner"] or DEFAULT_OWNER

Review Comment:
   These are fine but I wonder if this can be done better, for example with a dict of defaults and a loop like this:
   
   ```python
   partial_kwargs = {
       v if v is not None else DEFAULT_VALUES.get(k, v)
       for k, v in partiak_kwargs.items()
   }
   ```
   
   And would `ArgNotSet` better as a sentinel than `None`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on a diff in pull request #29913: Fix mapped tasks partial arguments when DAG default args are provided

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on code in PR #29913:
URL: https://github.com/apache/airflow/pull/29913#discussion_r1161790649


##########
airflow/models/baseoperator.py:
##########
@@ -292,34 +312,11 @@ def partial(
         "doc_yaml": doc_yaml,
     }
 
-    DEFAULT_VALUES: dict[str, Any] = {
-        "owner": DEFAULT_OWNER,
-        "trigger_rule": DEFAULT_TRIGGER_RULE,
-        "depends_on_past": False,
-        "ignore_first_depends_on_past": DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
-        "wait_for_past_depends_before_skipping": DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING,
-        "wait_for_downstream": False,
-        "retries": DEFAULT_RETRIES,
-        "queue": DEFAULT_QUEUE,
-        "pool_slots": DEFAULT_POOL_SLOTS,
-        "execution_timeout": DEFAULT_TASK_EXECUTION_TIMEOUT,
-        "retry_delay": DEFAULT_RETRY_DELAY,
-        "retry_exponential_backoff": False,
-        "priority_weight": DEFAULT_PRIORITY_WEIGHT,
-        "weight_rule": DEFAULT_WEIGHT_RULE,
-        "inlets": [],
-        "outlets": [],
-    }
+    # Populate kwargs from DAG-level default args.
+    partial_kwargs.update((k, v) for k, v in dag_default_args.items() if partial_kwargs.get(k) is NOTSET)
 
-    # Override NOTSET kwargs by dag default values
-    for k, v in default_partial_kwargs.items():
-        if partial_kwargs.get(k) is NOTSET:
-            partial_kwargs[k] = v
-
-    # Override NOTSET kwargs which don't have a dag default value by Airflow default value or None
-    partial_kwargs = {
-        k: v if v is not NOTSET else DEFAULT_VALUES.get(k, None) for k, v in partial_kwargs.items()
-    }
+    # Populate kwargs from Airflow default constants.
+    partial_kwargs.update((k, v) for k, v in _PARTIAL_DEFAULTS.items() if partial_kwargs.get(k) is NOTSET)

Review Comment:
   @uranusjr I'm not sure if this is safe.
   the previous version was replacing the the value `NOTSET` by `None` if it doesn't have a default value in `DEFAULT_VALUES`, but this version loops over the `_PARTIAL_DEFAULTS` and replaces only the `NOTSET` values which have a default value by the default ones. (ex: `doc_*`, `on_*_callback`, ...), I don't think the base operator is able to process `NOTSET` without any problem.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on a diff in pull request #29913: Fix mapped tasks partial arguments when DAG default args are provided

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on code in PR #29913:
URL: https://github.com/apache/airflow/pull/29913#discussion_r1133012327


##########
airflow/models/baseoperator.py:
##########
@@ -240,72 +240,123 @@ def partial(
         task_id = task_group.child_id(task_id)
 
     # Merge DAG and task group level defaults into user-supplied values.
-    partial_kwargs, partial_params = get_merged_defaults(
+    default_partial_kwargs, partial_params = get_merged_defaults(
         dag=dag,
         task_group=task_group,
         task_params=params,
         task_default_args=kwargs.pop("default_args", None),
     )
-    partial_kwargs.update(kwargs)
-
-    # Always fully populate partial kwargs to exclude them from map().
-    partial_kwargs.setdefault("dag", dag)
-    partial_kwargs.setdefault("task_group", task_group)
-    partial_kwargs.setdefault("task_id", task_id)
-    partial_kwargs.setdefault("start_date", start_date)
-    partial_kwargs.setdefault("end_date", end_date)
-    partial_kwargs.setdefault("owner", owner)
-    partial_kwargs.setdefault("email", email)
-    partial_kwargs.setdefault("trigger_rule", trigger_rule)
-    partial_kwargs.setdefault("depends_on_past", depends_on_past)
-    partial_kwargs.setdefault("ignore_first_depends_on_past", ignore_first_depends_on_past)
-    partial_kwargs.setdefault("wait_for_past_depends_before_skipping", wait_for_past_depends_before_skipping)
-    partial_kwargs.setdefault("wait_for_downstream", wait_for_downstream)
-    partial_kwargs.setdefault("retries", retries)
-    partial_kwargs.setdefault("queue", queue)
-    partial_kwargs.setdefault("pool", pool)
-    partial_kwargs.setdefault("pool_slots", pool_slots)
-    partial_kwargs.setdefault("execution_timeout", execution_timeout)
-    partial_kwargs.setdefault("max_retry_delay", max_retry_delay)
-    partial_kwargs.setdefault("retry_delay", retry_delay)
-    partial_kwargs.setdefault("retry_exponential_backoff", retry_exponential_backoff)
-    partial_kwargs.setdefault("priority_weight", priority_weight)
-    partial_kwargs.setdefault("weight_rule", weight_rule)
-    partial_kwargs.setdefault("sla", sla)
-    partial_kwargs.setdefault("max_active_tis_per_dag", max_active_tis_per_dag)
-    partial_kwargs.setdefault("on_execute_callback", on_execute_callback)
-    partial_kwargs.setdefault("on_failure_callback", on_failure_callback)
-    partial_kwargs.setdefault("on_retry_callback", on_retry_callback)
-    partial_kwargs.setdefault("on_success_callback", on_success_callback)
-    partial_kwargs.setdefault("run_as_user", run_as_user)
-    partial_kwargs.setdefault("executor_config", executor_config)
-    partial_kwargs.setdefault("inlets", inlets or [])
-    partial_kwargs.setdefault("outlets", outlets or [])
-    partial_kwargs.setdefault("resources", resources)
-    partial_kwargs.setdefault("doc", doc)
-    partial_kwargs.setdefault("doc_json", doc_json)
-    partial_kwargs.setdefault("doc_md", doc_md)
-    partial_kwargs.setdefault("doc_rst", doc_rst)
-    partial_kwargs.setdefault("doc_yaml", doc_yaml)
+
+    # Create partial_kwargs from args and kwargs
+    partial_kwargs = {
+        **kwargs,
+        "dag": dag,
+        "task_group": task_group,
+        "task_id": task_id,
+        "start_date": start_date,
+        "end_date": end_date,
+        "owner": owner,
+        "email": email,
+        "trigger_rule": trigger_rule,
+        "depends_on_past": depends_on_past,
+        "ignore_first_depends_on_past": ignore_first_depends_on_past,
+        "wait_for_past_depends_before_skipping": wait_for_past_depends_before_skipping,
+        "wait_for_downstream": wait_for_downstream,
+        "retries": retries,
+        "queue": queue,
+        "pool": pool,
+        "pool_slots": pool_slots,
+        "execution_timeout": execution_timeout,
+        "max_retry_delay": max_retry_delay,
+        "retry_delay": retry_delay,
+        "retry_exponential_backoff": retry_exponential_backoff,
+        "priority_weight": priority_weight,
+        "weight_rule": weight_rule,
+        "sla": sla,
+        "max_active_tis_per_dag": max_active_tis_per_dag,
+        "on_execute_callback": on_execute_callback,
+        "on_failure_callback": on_failure_callback,
+        "on_retry_callback": on_retry_callback,
+        "on_success_callback": on_success_callback,
+        "run_as_user": run_as_user,
+        "executor_config": executor_config,
+        "inlets": inlets,
+        "outlets": outlets,
+        "resources": resources,
+        "doc": doc,
+        "doc_json": doc_json,
+        "doc_md": doc_md,
+        "doc_rst": doc_rst,
+        "doc_yaml": doc_yaml,
+    }
+
+    # Override None kwargs by dag default values
+    for k, v in default_partial_kwargs.items():
+        if partial_kwargs.get(k) is None:
+            partial_kwargs[k] = v
+
+    # Override None kwargs which don't have a dag default value by Airflow default value
+    partial_kwargs["owner"] = partial_kwargs["owner"] or DEFAULT_OWNER

Review Comment:
   Could you please take a second look?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #29913: Fix mapped tasks partial arguments when DAG default args are provided

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on PR #29913:
URL: https://github.com/apache/airflow/pull/29913#issuecomment-1465530644

   May be related: #29366


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org