You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/01 09:44:39 UTC
[GitHub] [airflow] uranusjr opened a new pull request #22683: Correctly apply defaults to mapped task flow
uranusjr opened a new pull request #22683:
URL: https://github.com/apache/airflow/pull/22683
Since a taskflow @task is mapped differently from classic operator
classes, we missed applying DAG and task group level arguments to it.
This adds that (and a couple of tests for make sure it sticks).
A small refactoring is also introduced to merge the two default
extraction functions into one because we don't really need two of them;
they are always used together.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #22683: Correctly apply defaults to mapped task flow
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #22683:
URL: https://github.com/apache/airflow/pull/22683#discussion_r840650188
##########
File path: airflow/decorators/base.py
##########
@@ -305,13 +311,21 @@ def expand(self, **map_kwargs: "Mappable") -> XComArg:
prevent_duplicates(self.kwargs, map_kwargs, fail_reason="mapping already partial")
ensure_xcomarg_return_value(map_kwargs)
- partial_kwargs = self.kwargs.copy()
+ task_kwargs = self.kwargs.copy()
+ dag = task_kwargs.pop("dag", None) or DagContext.get_current_dag()
+ task_group = task_kwargs.pop("task_group", None) or TaskGroupContext.get_current_task_group(dag)
+
+ partial_kwargs, default_params = get_merged_defaults(
+ dag=dag,
+ task_group=task_group,
+ task_params=task_kwargs.pop("params", None),
+ task_default_args=task_kwargs.pop("default_args", None),
Review comment:
Oh well that's just weird. I wonder why it's there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #22683: Correctly apply defaults to mapped task flow
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #22683:
URL: https://github.com/apache/airflow/pull/22683#discussion_r840430783
##########
File path: airflow/decorators/base.py
##########
@@ -305,13 +311,21 @@ def expand(self, **map_kwargs: "Mappable") -> XComArg:
prevent_duplicates(self.kwargs, map_kwargs, fail_reason="mapping already partial")
ensure_xcomarg_return_value(map_kwargs)
- partial_kwargs = self.kwargs.copy()
+ task_kwargs = self.kwargs.copy()
+ dag = task_kwargs.pop("dag", None) or DagContext.get_current_dag()
+ task_group = task_kwargs.pop("task_group", None) or TaskGroupContext.get_current_task_group(dag)
+
+ partial_kwargs, default_params = get_merged_defaults(
+ dag=dag,
+ task_group=task_group,
+ task_params=task_kwargs.pop("params", None),
+ task_default_args=task_kwargs.pop("default_args", None),
Review comment:
`default_args` isn't a valid argument to Operators is it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a change in pull request #22683: Correctly apply defaults to mapped task flow
Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #22683:
URL: https://github.com/apache/airflow/pull/22683#discussion_r840437617
##########
File path: airflow/decorators/base.py
##########
@@ -305,13 +311,21 @@ def expand(self, **map_kwargs: "Mappable") -> XComArg:
prevent_duplicates(self.kwargs, map_kwargs, fail_reason="mapping already partial")
ensure_xcomarg_return_value(map_kwargs)
- partial_kwargs = self.kwargs.copy()
+ task_kwargs = self.kwargs.copy()
+ dag = task_kwargs.pop("dag", None) or DagContext.get_current_dag()
+ task_group = task_kwargs.pop("task_group", None) or TaskGroupContext.get_current_task_group(dag)
+
+ partial_kwargs, default_params = get_merged_defaults(
+ dag=dag,
+ task_group=task_group,
+ task_params=task_kwargs.pop("params", None),
+ task_default_args=task_kwargs.pop("default_args", None),
Review comment:
It actually is, there’s a `default_args` argument on BaseOperator.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #22683: Correctly apply defaults to mapped task flow
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #22683:
URL: https://github.com/apache/airflow/pull/22683#discussion_r840431972
##########
File path: tests/models/test_baseoperator.py
##########
@@ -877,3 +877,30 @@ def test_expand_mapped_task_instance_skipped_on_zero(dag_maker, session):
)
assert indices == [(-1, TaskInstanceState.SKIPPED)]
+
+
+def test_mapped_task_applies_default_args_classic(dag_maker):
+ with dag_maker(default_args={"sla": timedelta(minutes=30)}) as dag:
+ MockOperator(task_id="simple", arg1=None, arg2=0)
+ MockOperator.partial(task_id="mapped").expand(arg1=[1], arg2=[2, 3])
+
+ assert dag.get_task("simple").sla == timedelta(minutes=30)
+ assert dag.get_task("mapped").sla == timedelta(minutes=30)
Review comment:
We are about to make SLA not work with mapped operators so
```suggestion
with dag_maker(default_args={"execution_timeout": timedelta(minutes=30)}) as dag:
MockOperator(task_id="simple", arg1=None, arg2=0)
MockOperator.partial(task_id="mapped").expand(arg1=[1], arg2=[2, 3])
assert dag.get_task("simple").execution_timeout == timedelta(minutes=30)
assert dag.get_task("mapped").execution_timeout == timedelta(minutes=30)
```
##########
File path: tests/models/test_baseoperator.py
##########
@@ -877,3 +877,30 @@ def test_expand_mapped_task_instance_skipped_on_zero(dag_maker, session):
)
assert indices == [(-1, TaskInstanceState.SKIPPED)]
+
+
+def test_mapped_task_applies_default_args_classic(dag_maker):
+ with dag_maker(default_args={"sla": timedelta(minutes=30)}) as dag:
+ MockOperator(task_id="simple", arg1=None, arg2=0)
+ MockOperator.partial(task_id="mapped").expand(arg1=[1], arg2=[2, 3])
+
+ assert dag.get_task("simple").sla == timedelta(minutes=30)
+ assert dag.get_task("mapped").sla == timedelta(minutes=30)
+
+
+def test_mapped_task_applies_default_args_taskflow(dag_maker):
+ with dag_maker(default_args={"sla": timedelta(minutes=30)}) as dag:
+
+ @dag.task
+ def simple(arg):
+ pass
+
+ @dag.task
+ def mapped(arg):
+ pass
+
+ simple(arg=0)
+ mapped.expand(arg=[1, 2])
+
+ assert dag.get_task("simple").sla == timedelta(minutes=30)
+ assert dag.get_task("mapped").sla == timedelta(minutes=30)
Review comment:
```suggestion
assert dag.get_task("simple").execution_timeout == timedelta(minutes=30)
assert dag.get_task("mapped").execution_timeout == timedelta(minutes=30)
```
##########
File path: tests/models/test_baseoperator.py
##########
@@ -877,3 +877,30 @@ def test_expand_mapped_task_instance_skipped_on_zero(dag_maker, session):
)
assert indices == [(-1, TaskInstanceState.SKIPPED)]
+
+
+def test_mapped_task_applies_default_args_classic(dag_maker):
+ with dag_maker(default_args={"sla": timedelta(minutes=30)}) as dag:
+ MockOperator(task_id="simple", arg1=None, arg2=0)
+ MockOperator.partial(task_id="mapped").expand(arg1=[1], arg2=[2, 3])
+
+ assert dag.get_task("simple").sla == timedelta(minutes=30)
+ assert dag.get_task("mapped").sla == timedelta(minutes=30)
+
+
+def test_mapped_task_applies_default_args_taskflow(dag_maker):
+ with dag_maker(default_args={"sla": timedelta(minutes=30)}) as dag:
Review comment:
```suggestion
with dag_maker(default_args={"execution_timeout": timedelta(minutes=30)}) as dag:
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org