You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/01 09:44:39 UTC

[GitHub] [airflow] uranusjr opened a new pull request #22683: Correctly apply defaults to mapped task flow

uranusjr opened a new pull request #22683:
URL: https://github.com/apache/airflow/pull/22683


   Since a taskflow @task is mapped differently from classic operator
   classes, we missed applying DAG and task group level arguments to it.
   This adds that (and a couple of tests for make sure it sticks).
   
   A small refactoring is also introduced to merge the two default
   extraction functions into one because we don't really need two of them;
   they are always used together.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #22683: Correctly apply defaults to mapped task flow

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #22683:
URL: https://github.com/apache/airflow/pull/22683#discussion_r840650188



##########
File path: airflow/decorators/base.py
##########
@@ -305,13 +311,21 @@ def expand(self, **map_kwargs: "Mappable") -> XComArg:
         prevent_duplicates(self.kwargs, map_kwargs, fail_reason="mapping already partial")
         ensure_xcomarg_return_value(map_kwargs)
 
-        partial_kwargs = self.kwargs.copy()
+        task_kwargs = self.kwargs.copy()
+        dag = task_kwargs.pop("dag", None) or DagContext.get_current_dag()
+        task_group = task_kwargs.pop("task_group", None) or TaskGroupContext.get_current_task_group(dag)
+
+        partial_kwargs, default_params = get_merged_defaults(
+            dag=dag,
+            task_group=task_group,
+            task_params=task_kwargs.pop("params", None),
+            task_default_args=task_kwargs.pop("default_args", None),

Review comment:
       Oh well that's just weird. I wonder why it's there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #22683: Correctly apply defaults to mapped task flow

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #22683:
URL: https://github.com/apache/airflow/pull/22683#discussion_r840430783



##########
File path: airflow/decorators/base.py
##########
@@ -305,13 +311,21 @@ def expand(self, **map_kwargs: "Mappable") -> XComArg:
         prevent_duplicates(self.kwargs, map_kwargs, fail_reason="mapping already partial")
         ensure_xcomarg_return_value(map_kwargs)
 
-        partial_kwargs = self.kwargs.copy()
+        task_kwargs = self.kwargs.copy()
+        dag = task_kwargs.pop("dag", None) or DagContext.get_current_dag()
+        task_group = task_kwargs.pop("task_group", None) or TaskGroupContext.get_current_task_group(dag)
+
+        partial_kwargs, default_params = get_merged_defaults(
+            dag=dag,
+            task_group=task_group,
+            task_params=task_kwargs.pop("params", None),
+            task_default_args=task_kwargs.pop("default_args", None),

Review comment:
       `default_args` isn't a valid argument to Operators is it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #22683: Correctly apply defaults to mapped task flow

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #22683:
URL: https://github.com/apache/airflow/pull/22683#discussion_r840437617



##########
File path: airflow/decorators/base.py
##########
@@ -305,13 +311,21 @@ def expand(self, **map_kwargs: "Mappable") -> XComArg:
         prevent_duplicates(self.kwargs, map_kwargs, fail_reason="mapping already partial")
         ensure_xcomarg_return_value(map_kwargs)
 
-        partial_kwargs = self.kwargs.copy()
+        task_kwargs = self.kwargs.copy()
+        dag = task_kwargs.pop("dag", None) or DagContext.get_current_dag()
+        task_group = task_kwargs.pop("task_group", None) or TaskGroupContext.get_current_task_group(dag)
+
+        partial_kwargs, default_params = get_merged_defaults(
+            dag=dag,
+            task_group=task_group,
+            task_params=task_kwargs.pop("params", None),
+            task_default_args=task_kwargs.pop("default_args", None),

Review comment:
       It actually is, there’s a `default_args` argument on BaseOperator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #22683: Correctly apply defaults to mapped task flow

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #22683:
URL: https://github.com/apache/airflow/pull/22683#discussion_r840431972



##########
File path: tests/models/test_baseoperator.py
##########
@@ -877,3 +877,30 @@ def test_expand_mapped_task_instance_skipped_on_zero(dag_maker, session):
     )
 
     assert indices == [(-1, TaskInstanceState.SKIPPED)]
+
+
+def test_mapped_task_applies_default_args_classic(dag_maker):
+    with dag_maker(default_args={"sla": timedelta(minutes=30)}) as dag:
+        MockOperator(task_id="simple", arg1=None, arg2=0)
+        MockOperator.partial(task_id="mapped").expand(arg1=[1], arg2=[2, 3])
+
+    assert dag.get_task("simple").sla == timedelta(minutes=30)
+    assert dag.get_task("mapped").sla == timedelta(minutes=30)

Review comment:
       We are about to make SLA not work with mapped operators so
   
   ```suggestion
       with dag_maker(default_args={"execution_timeout": timedelta(minutes=30)}) as dag:
           MockOperator(task_id="simple", arg1=None, arg2=0)
           MockOperator.partial(task_id="mapped").expand(arg1=[1], arg2=[2, 3])
   
       assert dag.get_task("simple").execution_timeout == timedelta(minutes=30)
       assert dag.get_task("mapped").execution_timeout == timedelta(minutes=30)
   ```

##########
File path: tests/models/test_baseoperator.py
##########
@@ -877,3 +877,30 @@ def test_expand_mapped_task_instance_skipped_on_zero(dag_maker, session):
     )
 
     assert indices == [(-1, TaskInstanceState.SKIPPED)]
+
+
+def test_mapped_task_applies_default_args_classic(dag_maker):
+    with dag_maker(default_args={"sla": timedelta(minutes=30)}) as dag:
+        MockOperator(task_id="simple", arg1=None, arg2=0)
+        MockOperator.partial(task_id="mapped").expand(arg1=[1], arg2=[2, 3])
+
+    assert dag.get_task("simple").sla == timedelta(minutes=30)
+    assert dag.get_task("mapped").sla == timedelta(minutes=30)
+
+
+def test_mapped_task_applies_default_args_taskflow(dag_maker):
+    with dag_maker(default_args={"sla": timedelta(minutes=30)}) as dag:
+
+        @dag.task
+        def simple(arg):
+            pass
+
+        @dag.task
+        def mapped(arg):
+            pass
+
+        simple(arg=0)
+        mapped.expand(arg=[1, 2])
+
+    assert dag.get_task("simple").sla == timedelta(minutes=30)
+    assert dag.get_task("mapped").sla == timedelta(minutes=30)

Review comment:
       ```suggestion
       assert dag.get_task("simple").execution_timeout == timedelta(minutes=30)
       assert dag.get_task("mapped").execution_timeout == timedelta(minutes=30)
   ```

##########
File path: tests/models/test_baseoperator.py
##########
@@ -877,3 +877,30 @@ def test_expand_mapped_task_instance_skipped_on_zero(dag_maker, session):
     )
 
     assert indices == [(-1, TaskInstanceState.SKIPPED)]
+
+
+def test_mapped_task_applies_default_args_classic(dag_maker):
+    with dag_maker(default_args={"sla": timedelta(minutes=30)}) as dag:
+        MockOperator(task_id="simple", arg1=None, arg2=0)
+        MockOperator.partial(task_id="mapped").expand(arg1=[1], arg2=[2, 3])
+
+    assert dag.get_task("simple").sla == timedelta(minutes=30)
+    assert dag.get_task("mapped").sla == timedelta(minutes=30)
+
+
+def test_mapped_task_applies_default_args_taskflow(dag_maker):
+    with dag_maker(default_args={"sla": timedelta(minutes=30)}) as dag:

Review comment:
       ```suggestion
       with dag_maker(default_args={"execution_timeout": timedelta(minutes=30)}) as dag:
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org