You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/10 11:19:39 UTC

[GitHub] [airflow] turbaszek opened a new pull request #8805: Resolve upstream tasks when template field is XComArg

turbaszek opened a new pull request #8805:
URL: https://github.com/apache/airflow/pull/8805


   closes: #8054
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422637578



##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +651,33 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def _set_xcomargs_dependencies(self) -> None:
+        """
+        Resolves upstream dependencies of a task. In this way passing an ``XComArg`
+        as value for a template field will result in creating upstream relation between
+        two tasks.
+
+        **Example**: ::
+
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(..., html_content=generate_content.output)
+
+            # This is equivalent to
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(
+                    ..., html_content="{{ task_instance.xcom_pull('generate_content') }}"
+                )
+                generate_content >> send_email
+
+        """
+        from airflow.models.xcom_arg import XComArg
+        for field in self.template_fields:
+            arg = getattr(self, field)
+            if isinstance(arg, XComArg):
+                self.set_upstream(arg.operator)

Review comment:
       > what about pythonOperator? 
   What do you mean here @evgenyshulman ?
   
   > usually, template fields are traversed with `operator.render_template` call, we probably should do the same here
   
   I am not sure if I fully understand you here. What exactly is wrong? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636327497


   @ashb  I forgot what was the problem with metaclass:
   ```python
           with DAG(dag_id='xcomargs_test', default_args={"start_date": datetime.today()}):
               op1 = DummyOperator(task_id="op1")
               op2 = CustomOp(task_id="op2")
               op2.field = op1.output  # value is set after init
   
           assert op1 in op2.upstream_list
   ```
   this needs to be solved (your comment https://github.com/apache/airflow/pull/8805#discussion_r423205936)
   
   But this hints another possible solution without metaclass:
   ```python
   class BaseOperator():
       def __setattr__(self, key, value):
           super().__setattr__(key, value)
           # Resolve upstreams set by assigning an XComArg after initializing
           # an operator, example:
           #   op = BashOperator()
           #   op.bash_command = "sleep 1"
           from airflow.models.xcom_arg import XComArg
           if key in self.template_fields:
               self.set_xcomargs_dependencies()
   ```
   This one is nice and simple, however I am not sure if this will degradate performance in any way .
   
   @evgenyshulman @casassg @ashb @mik-laj thoughts? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-641253096


   Meh. The serialization doesn't work because `__call__` has other signature than `BaseOperator.__init__`. I think this could be easily solved by doing `super().__call__()` instead of `type.__call__` in metaclass but... we have `@apply_defaults` so this will fail because we pass *args and **kwargs. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r425141185



##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +634,43 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def set_xcomargs_dependencies(self) -> None:

Review comment:
       Hm, I'm not sure how this should be approached. What I would is :
   ```python
      def set_xcomargs_dependencies(self) -> None:
           from airflow.models.xcom_arg import XComArg
   
           def apply_set_upstream_and_lineage(arg: Any):
               if isinstance(arg, XComArg):
                   op: BaseOperator = arg.operator
                   self.set_upstream(op)
                   # Picks up any outlets from direct upstream tasks that have outlets defined,
                   # as such that if A -> B and B does not have inlets but A has outlets, then
                   # these are provided as inlets to B.
                   self.add_inlets(op.get_outlet_defs())
                 ...
   
           for field in self.template_fields:
               arg = getattr(self, field)
               apply_set_upstream_and_lineage(arg)
   ```
   However, I am not sure if this is the right approach. Not all outlets of A has to be inlets of B. Proposed behaviour is what we get when we use `inlets=AUTO` and I'm not sure this is something that should be done auto of the box. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] evgenyshulman commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
evgenyshulman commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422642296



##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +651,33 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def _set_xcomargs_dependencies(self) -> None:
+        """
+        Resolves upstream dependencies of a task. In this way passing an ``XComArg`
+        as value for a template field will result in creating upstream relation between
+        two tasks.
+
+        **Example**: ::
+
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(..., html_content=generate_content.output)
+
+            # This is equivalent to
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(
+                    ..., html_content="{{ task_instance.xcom_pull('generate_content') }}"
+                )
+                generate_content >> send_email
+
+        """
+        from airflow.models.xcom_arg import XComArg
+        for field in self.template_fields:
+            arg = getattr(self, field)
+            if isinstance(arg, XComArg):
+                self.set_upstream(arg.operator)

Review comment:
       ```
   op = PythonOperator( op_kwargs={"a": xarg_value}, op_args=[ xarg_value] )
   ```
   xarg_value will not be resolved into dependencie. Although at .execute xcom values are going to be used automatically 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r440054500



##########
File path: airflow/models/baseoperator.py
##########
@@ -633,6 +672,56 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def prepare_for_execution(self) -> "BaseOperator":
+        """
+        Lock task for execution to disable custom action in __setattr__ and
+        returns a copy of the task
+        """
+        other = copy.copy(self)
+        other._lock_for_execution = True

Review comment:
       ```suggestion
           other._lock_for_execution = True  # pylint: disable=protected-access
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422638768



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access

Review comment:
       > what happens when a user creates an operator without DAG?
   
   Well, nothing. If the question is about https://github.com/apache/airflow/pull/8805#discussion_r422633456 then AirflowException on missing DAG is raised. I've added a test for that.
   
   > 1. we can fix set_upstream
   
   Is this related to this comment https://github.com/apache/airflow/pull/8805#discussion_r422633321 ?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek removed a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek removed a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636327033


   > > Could we use the **exit** from the dag to fix this one case (but there are others that wouldn't be caught by that.
   > 
   > Exactly
   > 
   > > Given how rare metaprogramming is in Python, I think the metaclass approach gives a more complete solution, it catches all possible ways of assigning deps between a DAG.
   > 
   > I agree, in our codebase, there's only one operator that will need to be fixed.
   > 
   > > Yes, this is technically a breaking change, but I think it will affect almost no one. And I think the only "break" would be that using XComArg wouldn't work with a custom operator, unless your metaclass inherits this new metaclass. Is that correct? (i.e. the dag would still run, the user's metaclass would be the one that is used.)
   > 
   > I think yes
   > 
   > > So I'm +1 for metaclass, if I've understood it all correctly.
   > 
   > Happy to bring it back!
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r425099398



##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +634,43 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def set_xcomargs_dependencies(self) -> None:

Review comment:
       No, it only resolves up/down stream. I will take a look at the lineage 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422637578



##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +651,33 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def _set_xcomargs_dependencies(self) -> None:
+        """
+        Resolves upstream dependencies of a task. In this way passing an ``XComArg`
+        as value for a template field will result in creating upstream relation between
+        two tasks.
+
+        **Example**: ::
+
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(..., html_content=generate_content.output)
+
+            # This is equivalent to
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(
+                    ..., html_content="{{ task_instance.xcom_pull('generate_content') }}"
+                )
+                generate_content >> send_email
+
+        """
+        from airflow.models.xcom_arg import XComArg
+        for field in self.template_fields:
+            arg = getattr(self, field)
+            if isinstance(arg, XComArg):
+                self.set_upstream(arg.operator)

Review comment:
       > what about pythonOperator? 
   
   What do you mean here @evgenyshulman ?
   
   > usually, template fields are traversed with `operator.render_template` call, we probably should do the same here
   
   I am not sure if I fully understand you here. What exactly is wrong? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422651378



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access

Review comment:
       Additionally, I think we will have to introduce this metaclass now or later. Here's @mik-laj PR where he tried to address the `@apply_defaults`:
   https://github.com/apache/airflow/pull/7450
   
   One of the reasons to do that (from [devlist thread](https://lists.apache.org/thread.html/r4cf535e5dec1fbf8330f3c86031519092ca4820b382d3738413b23cd%40%3Cdev.airflow.apache.org%3E)):
   > I saw that PR today and it made me realize I've not been using apply_defaults in much of my code! Look forward to this being an automaic operation via a MetaClass, great work :) 
   
   However, this point is very valid:
   > meta_class introduce backward compatibility problems for users who already have metaclass defined for their operator ( they will have to inherit from BaseOperatorMeta)
   
   I suppose that we can add some refactor step once we have 2.0 and https://github.com/apache/airflow/issues/8765 . 
   
   Of course we can try to add this in `DagBag.bag_dag` (like policy):
   https://github.com/apache/airflow/blob/e1cc17e84856bd121724c634cb1809581c9c97db/airflow/models/dagbag.py#L322-L323
   or in `DAG.create_dagrun` but this in my opinion is not the best approach. Happy to discuss @evgenyshulman @potiuk @kaxil 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r425011133



##########
File path: airflow/providers/google/cloud/operators/automl.py
##########
@@ -558,7 +558,7 @@ class AutoMLTablesUpdateDatasetOperator(BaseOperator):
     :type gcp_conn_id: str
     """
 
-    template_fields = ("dataset", "update_mask", "location", "project_id")
+    template_fields = ("dataset", "update_mask", "location")

Review comment:
       Unrelated change?

##########
File path: airflow/providers/google/marketing_platform/operators/display_video.py
##########
@@ -565,7 +565,7 @@ class GoogleDisplayVideo360SDFtoGCSOperator(BaseOperator):
     :type delegate_to: str
     """
 
-    template_fields = ("operation_name", "bucket_name", "object_name", "body_request")
+    template_fields = ("operation_name", "bucket_name", "object_name")

Review comment:
       Unrelated change?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r425008181



##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +650,43 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def _set_xcomargs_dependencies(self) -> None:
+        """
+        Resolves upstream dependencies of a task. In this way passing an ``XComArg``
+        as value for a template field will result in creating upstream relation between
+        two tasks.
+
+        **Example**: ::
+
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(..., html_content=generate_content.output)
+
+            # This is equivalent to
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(
+                    ..., html_content="{{ task_instance.xcom_pull('generate_content') }}"
+                )
+                generate_content >> send_email
+
+        """
+        from airflow.models.xcom_arg import XComArg
+
+        def apply_set_upstream(arg: Any):
+            if isinstance(arg, XComArg):
+                self.set_upstream(arg.operator)
+            elif isinstance(arg, (tuple, set, list)):
+                for elem in arg:
+                    apply_set_upstream(elem)
+            elif isinstance(arg, dict):

Review comment:
       https://github.com/apache/airflow/commit/d567f9ab8d5fdd6d18509fd8d623b26795eca25c




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636327497


   @ashb  I forgot what was the problem with metaclass:
   ```python
           with DAG(dag_id='xcomargs_test', default_args={"start_date": datetime.today()}):
               op1 = DummyOperator(task_id="op1")
               op2 = CustomOp(task_id="op2")
               op2.field = op1.output  # value is set after init
   
           assert op1 in op2.upstream_list
   ```
   this needs to be solved also: https://github.com/apache/airflow/pull/8805#discussion_r423205936
   
   But this hints another possible solution without metaclass:
   ```python
   class BaseOperator():
       def __setattr__(self, key, value):
           super().__setattr__(key, value)
           # Resolve upstreams set by assigning an XComArg after initializing
           # an operator, example:
           #   op = BashOperator()
           #   op.bash_command = "sleep 1"
           from airflow.models.xcom_arg import XComArg
           if key in self.template_fields:
               self.set_xcomargs_dependencies()
   ```
   This one is nice and simple, however I am not sure if this will degradate performance in any way .
   
   @evgenyshulman @casassg @ashb @mik-laj thoughts? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773


   I did a simple test:
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.utils.decorators import apply_defaults
   from scripts.perf.perf_kit import python, memory, repeat_and_time
   
   fields = [f"field{i}" for i in range(30)]
   
   
   class CustomOp(DummyOperator):
       template_fields = fields  # 30 template fields
   
       @apply_defaults
       def __init__(self, *args, **kwargs):
           super().__init__(*args, task_id=kwargs["task_id"])
           for key in kwargs:
               if key.startswith("field"):
                   setattr(self, key, kwargs[key])
   
   
   if __name__ == '__main__':
       N = 10
       OP_N = 100
   
       @repeat_and_time.timing(N)
       @repeat_and_time.repeat(N)
       def case():
           with DAG("xcomargs_test", default_args={"start_date": datetime.today()}):
               op1 = DummyOperator(task_id="op1")
               for i in range(OP_N):
                   kwargs = {k: op1.output for k in fields}
                   CustomOp(task_id=f"task_{i}", **kwargs)
   
       print("OPs: ", OP_N)
       case()
   ```
   Average time for 100 OPs in single DAG:
   - metaclass: 471.470 ms
   - setattr + if: 7072.531 ms
   - setattr: 7208.424 ms
   
   So using setattr is **14** slower than metaclass... 
   
   But to make everything works smooth we can mix those two approaches:
   ```python
   class BaseOperatorMeta(type):
       def __call__(cls, *args, **kwargs):
           obj: BaseOperator = type.__call__(cls, *args, **kwargs)
           # Set upstream task defined by XComArgs passed to template fields of an operator
           obj.set_xcomargs_dependencies()
           obj._instantiated = True
           return obj
   
   
   class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta):
       _instantiated = False
   
       def __setattr__(self, key, value):
           super().__setattr__(key, value)
           if self._instantiated and key in self.template_fields:
               self.set_xcomargs_dependencies()
   ```
   
   this gives the following result **471.381 ms**
   
   Edit: additionaly we should add an `_locked_for_execution` flag to BaseOperator and set it to True before executing. Then adjust `__setattr__` to
   ```python
       def __setattr__(self, key, value):
           super().__setattr__(key, value)
           if self._instantiated and not _locked_for_execution  and key in self.template_fields:
               self.set_xcomargs_dependencies()
   ```
   Why? We don't want to do any custom action (including resolving upstream) during execution. And it may happen that someone will assign/reassign value to template field attribute in `execute()`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek removed a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek removed a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-641253096


   Meh. The serialization doesn't work because `__call__` has other signature than `BaseOperator.__init__`. I think this could be easily solved by doing `super().__call__()` instead of `type.__call__` in metaclass but... we have `@apply_defaults` so this will fail because we pass *args and **kwargs. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773


   I did a simple test:
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.utils.decorators import apply_defaults
   from scripts.perf.perf_kit import python, memory, repeat_and_time
   
   fields = [f"field{i}" for i in range(30)]
   
   
   class CustomOp(DummyOperator):
       template_fields = fields
   
       @apply_defaults
       def __init__(self, *args, **kwargs):
           super().__init__(*args, task_id=kwargs["task_id"])
           for key in kwargs:
               if key.startswith("field"):
                   setattr(self, key, kwargs[key])
   
   
   if __name__ == '__main__':
       N = 10
       OP_N = 100
   
       @repeat_and_time.timing(N)
       @repeat_and_time.repeat(N)
       def case():
           with DAG("xcomargs_test", default_args={"start_date": datetime.today()}):
               op1 = DummyOperator(task_id="op1")
               for i in range(OP_N):
                   kwargs = {k: op1.output for k in fields}
                   CustomOp(task_id=f"task_{i}", **kwargs)
   
       print("OPs: ", OP_N)
       case()
   ```
   Average time for 100 OPs in single DAG:
   - metaclass: 492.990 ms
   - setattr + if: 7072.531 ms
   - setattr: 7208.424 ms
   
   So using setattr is **14** slower than metaclass... 
   
   But to make everything works smooth we can mix those two approaches:
   ```python
   class BaseOperatorMeta(type):
       def __call__(cls, *args, **kwargs):
           obj: BaseOperator = type.__call__(cls, *args, **kwargs)
           # Set upstream task defined by XComArgs passed to template fields of an operator
           obj.set_xcomargs_dependencies()
           obj._instantiated = True
           return obj
   
   
   class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta):
       _instantiated = False
   
       def __setattr__(self, key, value):
           super().__setattr__(key, value)
           if self._instantiated and key in self.template_fields:
               self.set_xcomargs_dependencies()
   ```
   
   this gives the following result **471.381 ms**


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r425100781



##########
File path: airflow/providers/google/marketing_platform/operators/display_video.py
##########
@@ -565,7 +565,7 @@ class GoogleDisplayVideo360SDFtoGCSOperator(BaseOperator):
     :type delegate_to: str
     """
 
-    template_fields = ("operation_name", "bucket_name", "object_name", "body_request")
+    template_fields = ("operation_name", "bucket_name", "object_name")

Review comment:
       I will rebase, it was fixed in other PR.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422638768



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access

Review comment:
       > meta_class introduce backward compatibility problems for users who already have metaclass defined for their operator ( they will have to inherit from BaseOperatorMeta)
   
   True
   
   > what happens when a user creates an operator without DAG?
   
   Well, nothing. If the question is about https://github.com/apache/airflow/pull/8805#discussion_r422633456 then AirflowException on missing DAG is raised. I've added a test for that.
   
   > 1. we can fix set_upstream
   
   Is this related to this comment https://github.com/apache/airflow/pull/8805#discussion_r422633321 ?
   
   > 2. we can just finalize Operator implementation when we add it to dag ( thus no metaclass is required)
   
   User can either provide the `dag` argument to operator or if it's `None` then we call `DagContext.get_current_dag()` . Both happens during `__init__` call, so I'm not sure if I understand when the "add to dag" moment happens.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r437473652



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -297,7 +297,7 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
     _decorated_fields = {'executor_config'}
 
     _CONSTRUCTOR_PARAMS = {
-        k: v.default for k, v in signature(BaseOperator).parameters.items()
+        k: v.default for k, v in signature(BaseOperator.__init__).parameters.items()

Review comment:
       Because:
   ```python
   In [3]: class meta(type):
      ...:     def __call__(cls, *args, **kwargs):
      ...:         return type.__call__(cls, *args, **kwargs)
      ...:
   
   In [7]: class Klass(metaclass=meta):
      ...:     def __init__(self, a, b, c):
      ...:         pass
      ...:
   
   In [8]: signature(Klass).parameters
   Out[8]: mappingproxy({'args': <Parameter "*args">, 'kwargs': <Parameter "**kwargs">})
   
   In [9]: signature(Klass.__init__).parameters
   Out[9]:
   mappingproxy({'self': <Parameter "self">,
                 'a': <Parameter "a">,
                 'b': <Parameter "b">,
                 'c': <Parameter "c">})




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r425009313



##########
File path: airflow/example_dags/example_xcomargs.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the XComArgs."""
+
+from airflow import DAG
+from airflow.operators.python import PythonOperator
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+    'start_date': days_ago(2),
+}
+
+
+def dummy(*args, **kwargs):
+    """Dummy function"""
+    return "pass"
+
+
+with DAG(
+    dag_id='example_xcom_args',
+    default_args=args,
+    schedule_interval=None,
+    tags=['example']
+) as dag:
+    task1 = PythonOperator(
+        task_id='task1',
+        python_callable=dummy,
+    )
+
+    task2 = PythonOperator(
+        task_id='task2',
+        python_callable=dummy,
+        op_kwargs={"dummy": task1.output},
+    )
+
+    task3 = PythonOperator(
+        task_id='task3',
+        python_callable=dummy,
+    )
+
+    task3.op_kwargs = {"dummy": task2.output}

Review comment:
       Is this set afterwards to check that it works?
   
   If so we should have that in a test, but not in an example dag (as we don't want to encourage users to do this, as they often copy what our example dags to.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-635363551


   @ashb currently proposed approach resolves the relation when creating a DagBag:
   ```python
   for task in dag.tasks:
       settings.policy(task)
       task.set_xcomargs_dependencies() # here
   ```
   
   The problem is that in such case
   ```python
   with DAG(dag_id='xcomargs_test_3', default_args={"start_date": datetime.today()}) as dag3:
       op1 = DummyOperator(task_id="op1")
       op2 = CustomOp(task_id="op2", field=op1.output)
   
   assert op1 in op2.upstream_list. # False
   ```
   op1 is not in upstream of op2 because the relation is not resolved (no dagbag created).
   Same happens with polices but those are not as important as relations.
   
   The metaclass approach would be more generic and would work always. A possible problem is breaking backward compatibility. Users may use custom meta in their operators and will have to upgrade. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek closed pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek closed pull request #8805:
URL: https://github.com/apache/airflow/pull/8805


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r425098996



##########
File path: airflow/example_dags/example_xcomargs.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the XComArgs."""
+
+from airflow import DAG
+from airflow.operators.python import PythonOperator
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+    'start_date': days_ago(2),
+}
+
+
+def dummy(*args, **kwargs):
+    """Dummy function"""
+    return "pass"
+
+
+with DAG(
+    dag_id='example_xcom_args',
+    default_args=args,
+    schedule_interval=None,
+    tags=['example']
+) as dag:
+    task1 = PythonOperator(
+        task_id='task1',
+        python_callable=dummy,
+    )
+
+    task2 = PythonOperator(
+        task_id='task2',
+        python_callable=dummy,
+        op_kwargs={"dummy": task1.output},
+    )
+
+    task3 = PythonOperator(
+        task_id='task3',
+        python_callable=dummy,
+    )
+
+    task3.op_kwargs = {"dummy": task2.output}

Review comment:
       Yes, I will move it to a test dags folder. I need a file because I have to create dag bag. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422643152



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access

Review comment:
       We can try to resolve those dependencies when creating a `DagRun`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r425008181



##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +650,43 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def _set_xcomargs_dependencies(self) -> None:
+        """
+        Resolves upstream dependencies of a task. In this way passing an ``XComArg``
+        as value for a template field will result in creating upstream relation between
+        two tasks.
+
+        **Example**: ::
+
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(..., html_content=generate_content.output)
+
+            # This is equivalent to
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(
+                    ..., html_content="{{ task_instance.xcom_pull('generate_content') }}"
+                )
+                generate_content >> send_email
+
+        """
+        from airflow.models.xcom_arg import XComArg
+
+        def apply_set_upstream(arg: Any):
+            if isinstance(arg, XComArg):
+                self.set_upstream(arg.operator)
+            elif isinstance(arg, (tuple, set, list)):
+                for elem in arg:
+                    apply_set_upstream(elem)
+            elif isinstance(arg, dict):

Review comment:
       https://github.com/apache/airflow/commit/d567f9ab8d5fdd6d18509fd8d623b26795eca25c
   
   Example of such a class from the docs/changelog:
   
   
   ```python
     class MyDataReader:
       template_fields = ['path']
       def __init__(self, my_path):
         self.path = my_path
       # [additional code here...]
     t = PythonOperator(
         task_id='transform_data',
         python_callable=transform_data
         op_args=[
           MyDataReader('/tmp/{{ ds }}/my_file')
         ],
         dag=dag)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-630813143


   > Where has this concept come from? It wasn't in the AIP.
   
   It was proposed in https://github.com/apache/airflow/issues/8055 . Without it mixing functional and non-functional operators will be hard. 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636322533


   > Could we use the __exit__ from the dag to fix this one case (but there are others that wouldn't be caught by that.
   
   Exactly
   
   > Given how rare metaprogramming is in Python, I think the metaclass approach gives a more complete solution, it catches all possible ways of assigning deps between a DAG.
   
   I agree, in our codebase, there's only one operator that will need to be fixed.
   
   > Yes, this is technically a breaking change, but I think it will affect almost no one. And I think the only "break" would be that using XComArg wouldn't work with a custom operator, unless your metaclass inherits this new metaclass. Is that correct? (i.e. the dag would still run, the user's metaclass would be the one that is used.)
   
   I think yes
   
   > So I'm +1 for metaclass, if I've understood it all correctly.
   
   Happy to bring it back!
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r425010809



##########
File path: airflow/models/dag.py
##########
@@ -1468,6 +1468,10 @@ def create_dagrun(self,
         :param session: database session
         :type session: sqlalchemy.orm.session.Session
         """
+        # Resolve relationship between task set by XComArgs
+        for task in self.tasks:
+            task.set_xcomargs_dependencies()

Review comment:
       This is going to be too late to affect the Graph view of the webserver isn't it?
   
   (This either needs to go much, much earlier, to dag creation time so it is correctly reflected in the serialized representation, or this is not needed here at all.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r428087205



##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +634,43 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def set_xcomargs_dependencies(self) -> None:

Review comment:
       What would you say to remove lineage from the scope if this PR? I can create an issue addressing this feature. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r425141185



##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +634,43 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def set_xcomargs_dependencies(self) -> None:

Review comment:
       Hm, I'm not sure how this should be approached. What I would is :
   ```python
               if isinstance(arg, XComArg):
                   op: BaseOperator = arg.operator
                   self.set_upstream(op)
                   # Picks up any outlets from direct upstream tasks that have outlets defined,
                   # as such that if A -> B and B does not have inlets but A has outlets, then
                   # these are provided as inlets to B.
                   self.add_inlets(op.get_outlet_defs())
   ```
   However, I am not sure if this is the right approach. Not all outlets of A has to be inlets of B. Proposed behaviour is what we get when we use `inlets=AUTO` and I'm not sure this is something that should be done auto of the box. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r427282265



##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +650,43 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def _set_xcomargs_dependencies(self) -> None:
+        """
+        Resolves upstream dependencies of a task. In this way passing an ``XComArg``
+        as value for a template field will result in creating upstream relation between
+        two tasks.
+
+        **Example**: ::
+
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(..., html_content=generate_content.output)
+
+            # This is equivalent to
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(
+                    ..., html_content="{{ task_instance.xcom_pull('generate_content') }}"
+                )
+                generate_content >> send_email
+
+        """
+        from airflow.models.xcom_arg import XComArg
+
+        def apply_set_upstream(arg: Any):
+            if isinstance(arg, XComArg):
+                self.set_upstream(arg.operator)
+            elif isinstance(arg, (tuple, set, list)):
+                for elem in arg:
+                    apply_set_upstream(elem)
+            elif isinstance(arg, dict):

Review comment:
       @turbaszek 
   
   You  haven't dealt with this case yet have you?
   
   ```python
   
   class MyDataReader:
       template_fields = ['path']
       def __init__(self, my_path):
         self.path = my_path
   
   
   with dag:
     generate_content = GenerateContentOperator(task_id="generate_content")
     t = PythonOperator(
         task_id='transform_data',
         python_callable=transform_data
         op_args=[
           MyDataReader(generate_content.output)
         ],
   )
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422651378



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access

Review comment:
       Additionally, I think we will have to introduce this metaclass now or later. Here's @mik-laj PR where he tried to address the `@apply_defaults`:
   https://github.com/apache/airflow/pull/7450
   
   One of the reasons to do that (from [devlist thread](https://lists.apache.org/thread.html/r4cf535e5dec1fbf8330f3c86031519092ca4820b382d3738413b23cd%40%3Cdev.airflow.apache.org%3E)):
   > I saw that PR today and it made me realize I've not been using apply_defaults in much of my code! Look forward to this being an automaic operation via a MetaClass, great work :) 
   
   However, this point is very valid:
   > meta_class introduce backward compatibility problems for users who already have metaclass defined for their operator ( they will have to inherit from BaseOperatorMeta)
   
   I suppose that we can add some refactor step once we have 2.0 and https://github.com/apache/airflow/issues/8765 . 
   
   Of course we can try to add this in `DagBag.bag_dag` (like policy) or `DAG.create_dagrun` but this in my opinion is not the best approach. Happy to discuss @evgenyshulman @potiuk @kaxil 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636777316


   > > Why? We don't want to do any custom action (including resolving upstream) during execution. And it may happen that someone will assign/reassign value to template field attribute in execute()
   > 
   > This is in fact _exactly_ what `task.render_template_fields()` does, so yes, we do want to set something like this.
   
   @ashb ture, but I think we don't want to set upstreams when executing task, right?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r425141185



##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +634,43 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def set_xcomargs_dependencies(self) -> None:

Review comment:
       Hm, I'm not sure how this should be approached. What I would is :
   ```python
      def set_xcomargs_dependencies(self) -> None:
           from airflow.models.xcom_arg import XComArg
   
           def apply_set_upstream_and_lineage(arg: Any):
               if isinstance(arg, XComArg):
                   op: BaseOperator = arg.operator
                   self.set_upstream(op)
                   # Picks up any outlets from direct upstream tasks that have outlets defined,
                   # as such that if A -> B and B does not have inlets but A has outlets, then
                   # these are provided as inlets to B.
                   self.add_inlets(op.get_outlet_defs())
                 ...
   
           for field in self.template_fields:
               arg = getattr(self, field)
               apply_set_upstream_and_lineage(arg)
   ```
   However, I am not sure if this is the right approach. Not all outlets of A has to be inlets of B. Proposed behaviour is what we get when we use `inlets=AUTO` and I'm not sure this is something that should be done out of the box. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek merged pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek merged pull request #8805:
URL: https://github.com/apache/airflow/pull/8805


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r425101458



##########
File path: airflow/models/dag.py
##########
@@ -1468,6 +1468,10 @@ def create_dagrun(self,
         :param session: database session
         :type session: sqlalchemy.orm.session.Session
         """
+        # Resolve relationship between task set by XComArgs
+        for task in self.tasks:
+            task.set_xcomargs_dependencies()

Review comment:
       That's my main concern. I will add tests for serialization and see what can be done. That was my main concern, that we have to keep this logic in more than one place (dag bag and somewhere around dag creation without dag bag) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r433166090



##########
File path: tests/test_utils/__init__.py
##########
@@ -20,3 +20,11 @@
 AIRFLOW_MAIN_FOLDER = os.path.realpath(
     os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, os.pardir)
 )
+
+EXAMPLE_DAGS_FOLDER = os.path.join(
+    AIRFLOW_MAIN_FOLDER, "airflow", "example_dags"
+)
+
+TEST_DAGS_FOLDER = os.path.realpath(

Review comment:
       This is already set from airflow.settings.TEST_DAGS_FOLDER - we don't need another constant

##########
File path: tests/models/test_baseoperator.py
##########
@@ -347,3 +350,87 @@ def test_lineage_composition(self):
         task4 = DummyOperator(task_id="op4", dag=dag)
         task4 > [inlet, outlet, extra]
         self.assertEqual(task4.get_outlet_defs(), [inlet, outlet, extra])
+
+
+class CustomOp(DummyOperator):
+    template_fields = ("field", "field2")
+
+    @apply_defaults
+    def __init__(self, field=None, field2=None, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.field = field
+        self.field2 = field2
+
+    def execute(self, context):
+        self.field = None
+
+
+class TestXComArgsRelationsAreResolved:
+    def test_setattr_performs_no_custom_action_at_execute_time(self):
+        op = CustomOp(task_id="test_task")
+        op.lock_for_execution()
+
+        with mock.patch(
+            "airflow.models.baseoperator.BaseOperator.set_xcomargs_dependencies"
+        ) as method_mock:
+            op.execute({})
+        assert method_mock.call_count == 0
+
+    def test_upstream_is_set_when_template_field_is_xcomarg(self):
+        with DAG("xcomargs_test", default_args={"start_date": datetime.today()}):
+            op1 = DummyOperator(task_id="op1")
+            op2 = CustomOp(task_id="op2", field=op1.output)
+
+        assert op1 in op2.upstream_list
+        assert op2 in op1.downstream_list
+
+    def test_set_xcomargs_dependencies_works_recursively(self):
+        with DAG("xcomargs_test", default_args={"start_date": datetime.today()}):
+            op1 = DummyOperator(task_id="op1")
+            op2 = DummyOperator(task_id="op2")
+            op3 = CustomOp(task_id="op3", field=[op1.output, op2.output])
+            op4 = CustomOp(task_id="op4", field={"op1": op1.output, "op2": op2.output})
+
+        assert op1 in op3.upstream_list
+        assert op2 in op3.upstream_list
+        assert op1 in op4.upstream_list
+        assert op2 in op4.upstream_list
+
+    def test_set_xcomargs_dependencies_works_when_set_after_init(self):
+        with DAG(dag_id='xcomargs_test', default_args={"start_date": datetime.today()}):
+            op1 = DummyOperator(task_id="op1")
+            op2 = CustomOp(task_id="op2")
+            op2.field = op1.output  # value is set after init
+
+        assert op1 in op2.upstream_list
+
+    def test_set_xcomargs_dependencies_error_when_outside_dag(self):
+        with pytest.raises(AirflowException):
+            op1 = DummyOperator(task_id="op1")
+            CustomOp(task_id="op2", field=op1.output)
+
+    def test_set_xcomargs_dependencies_when_creating_dagbag(self):
+        dag_bag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False)
+        dag_id = "xcomargs_test_1"
+        dag: DAG = dag_bag.get_dag(dag_id)

Review comment:
       i.e. just create this DAG here, as it isn't used anywhere else.
   
   Same for xcomargs_test_2

##########
File path: airflow/models/baseoperator.py
##########
@@ -292,6 +311,12 @@ class derived from this one results in the creation of a task object,
     # Defines if the operator supports lineage without manual definitions
     supports_lineage = False
 
+    # If True then the class constructor was called
+    _instantiated = False

Review comment:
       ```suggestion
       __instantiated = False
   ```
   
   I think we can do this, which then means this is much much less likely to clash with any fields in a subclass.
   
   Internally Python converts `__x` to `_BaseOperator__x` when used: https://docs.python.org/3/tutorial/classes.html#tut-private

##########
File path: tests/dags/test_xcomargs_dag.py
##########
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the XComArgs."""
+from datetime import datetime
+
+from airflow import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.dates import days_ago
+from airflow.utils.decorators import apply_defaults
+
+
+class CustomOp(DummyOperator):
+    template_fields = ("field",)
+
+    @apply_defaults
+    def __init__(self, field=None, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.field = field
+
+
+args = {
+    'owner': 'airflow',
+    'start_date': days_ago(2),
+}
+
+
+def dummy(*args, **kwargs):
+    """Dummy function"""
+    return "pass"
+
+
+with DAG(dag_id='xcomargs_test_1', default_args={"start_date": datetime.today()}) as dag1:

Review comment:
       Every dag we add to tests/dags slows down our many tests, so unless we _need_ the dag to exist in a normal dags folder (i.e. when we are running it via a full scheduler/backfill) we should just create dags in the unit tests please. 

##########
File path: airflow/models/baseoperator.py
##########
@@ -633,6 +670,51 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def lock_for_execution(self) -> None:
+        """Sets _lock_for_execution to True"""

Review comment:
       This description is somewhat self-referrential - I could have guess this is what it did based on the name of the method.
   
   Either say what this setting actually does, or remove the doc comment please -- having this doc doesn't add any value

##########
File path: airflow/models/taskinstance.py
##########
@@ -968,6 +968,7 @@ def _run_raw_task(
                 context = self.get_template_context()
 
                 task_copy = copy.copy(task)
+                task_copy.lock_for_execution()

Review comment:
       How about we combine both these in to a single function `task.prepare_for_execution()` that returns a new copied instance with the "lock" set.
   
   We also do `task_copy = copy.copy(task) in the `dry_run` fn later on -- we should use that here to.

##########
File path: tests/models/test_baseoperator.py
##########
@@ -347,3 +350,87 @@ def test_lineage_composition(self):
         task4 = DummyOperator(task_id="op4", dag=dag)
         task4 > [inlet, outlet, extra]
         self.assertEqual(task4.get_outlet_defs(), [inlet, outlet, extra])
+
+
+class CustomOp(DummyOperator):
+    template_fields = ("field", "field2")
+
+    @apply_defaults
+    def __init__(self, field=None, field2=None, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.field = field
+        self.field2 = field2
+
+    def execute(self, context):
+        self.field = None
+
+
+class TestXComArgsRelationsAreResolved:
+    def test_setattr_performs_no_custom_action_at_execute_time(self):
+        op = CustomOp(task_id="test_task")
+        op.lock_for_execution()
+
+        with mock.patch(
+            "airflow.models.baseoperator.BaseOperator.set_xcomargs_dependencies"
+        ) as method_mock:
+            op.execute({})
+        assert method_mock.call_count == 0
+
+    def test_upstream_is_set_when_template_field_is_xcomarg(self):
+        with DAG("xcomargs_test", default_args={"start_date": datetime.today()}):
+            op1 = DummyOperator(task_id="op1")
+            op2 = CustomOp(task_id="op2", field=op1.output)
+
+        assert op1 in op2.upstream_list
+        assert op2 in op1.downstream_list
+
+    def test_set_xcomargs_dependencies_works_recursively(self):
+        with DAG("xcomargs_test", default_args={"start_date": datetime.today()}):
+            op1 = DummyOperator(task_id="op1")
+            op2 = DummyOperator(task_id="op2")
+            op3 = CustomOp(task_id="op3", field=[op1.output, op2.output])
+            op4 = CustomOp(task_id="op4", field={"op1": op1.output, "op2": op2.output})
+
+        assert op1 in op3.upstream_list
+        assert op2 in op3.upstream_list
+        assert op1 in op4.upstream_list
+        assert op2 in op4.upstream_list
+
+    def test_set_xcomargs_dependencies_works_when_set_after_init(self):
+        with DAG(dag_id='xcomargs_test', default_args={"start_date": datetime.today()}):
+            op1 = DummyOperator(task_id="op1")
+            op2 = CustomOp(task_id="op2")
+            op2.field = op1.output  # value is set after init
+
+        assert op1 in op2.upstream_list
+
+    def test_set_xcomargs_dependencies_error_when_outside_dag(self):
+        with pytest.raises(AirflowException):
+            op1 = DummyOperator(task_id="op1")
+            CustomOp(task_id="op2", field=op1.output)
+
+    def test_set_xcomargs_dependencies_when_creating_dagbag(self):
+        dag_bag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False)
+        dag_id = "xcomargs_test_1"
+        dag: DAG = dag_bag.get_dag(dag_id)

Review comment:
       (Not having to load the full test dag bag makes this test much quicker as well.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636328816


   No comment on performance or not, but an observation: we don't need to check for just template fields if we are doing that - we could let it work when setting _any_ attribute to an XComArg, which would be nice


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422858966



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access

Review comment:
       > I would vote for dagbag "post" action. 
   
   This would require some additional changes to make it work everywhere. For example, running `dag.run()` doesn't create DagBag thus the relation will not be set. 
   
   > people can assign values not only after super().init in subclass, but also in the DAG definition file as well.
   
   I am not sure I got it. To use `a.output` you have to first define `a` so there's where we resolve the upstream for the `a`. Using "functional" approach, users will have to define upstream tasks of an `a` task before he defines the `a` otherwise they will not be able to use `output` parameter. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r425141185



##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +634,43 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def set_xcomargs_dependencies(self) -> None:

Review comment:
       Hm, I'm not sure how this should be approached. What I would is :
   ```python
      def set_xcomargs_dependencies(self) -> None:
           from airflow.models.xcom_arg import XComArg
   
           def apply_set_upstream_and_lineage(arg: Any):
               if isinstance(arg, XComArg):
                   op: BaseOperator = arg.operator
                   self.set_upstream(op)
                   # Picks up any outlets from direct upstream tasks that have outlets defined,
                   # as such that if A -> B and B does not have inlets but A has outlets, then
                   # these are provided as inlets to B.
                   self.add_inlets(op.get_outlet_defs())
   ```
   However, I am not sure if this is the right approach. Not all outlets of A has to be inlets of B. Proposed behaviour is what we get when we use `inlets=AUTO` and I'm not sure this is something that should be done auto of the box. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773


   I did a simple test:
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.utils.decorators import apply_defaults
   from scripts.perf.perf_kit import python, memory, repeat_and_time
   
   fields = [f"field{i}" for i in range(30)]
   
   
   class CustomOp(DummyOperator):
       template_fields = fields  # 30 template fields
   
       @apply_defaults
       def __init__(self, *args, **kwargs):
           super().__init__(*args, task_id=kwargs["task_id"])
           for key in kwargs:
               if key.startswith("field"):
                   setattr(self, key, kwargs[key])
   
   
   if __name__ == '__main__':
       N = 10
       OP_N = 100
   
       @repeat_and_time.timing(N)
       @repeat_and_time.repeat(N)
       def case():
           with DAG("xcomargs_test", default_args={"start_date": datetime.today()}):
               op1 = DummyOperator(task_id="op1")
               for i in range(OP_N):
                   kwargs = {k: op1.output for k in fields}
                   CustomOp(task_id=f"task_{i}", **kwargs)
   
       print("OPs: ", OP_N)
       case()
   ```
   Average time for 100 OPs in single DAG:
   - metaclass: 471.470 ms
   - setattr + if: 7072.531 ms
   - setattr: 7208.424 ms
   
   So using setattr is **14** times slower than metaclass... 
   
   But to make everything works smooth we can mix those two approaches:
   ```python
   class BaseOperatorMeta(type):
       def __call__(cls, *args, **kwargs):
           obj: BaseOperator = type.__call__(cls, *args, **kwargs)
           # Set upstream task defined by XComArgs passed to template fields of an operator
           obj.set_xcomargs_dependencies()
           obj._instantiated = True
           return obj
   
   
   class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta):
       _instantiated = False
   
       def __setattr__(self, key, value):
           super().__setattr__(key, value)
           if self._instantiated and key in self.template_fields:
               self.set_xcomargs_dependencies()
   ```
   
   this gives the following result **471.381 ms**
   
   Edit: additionaly we should add an `_locked_for_execution` flag to BaseOperator and set it to True before executing. Then adjust `__setattr__` to
   ```python
       def __setattr__(self, key, value):
           super().__setattr__(key, value)
           if self._instantiated and not _locked_for_execution  and key in self.template_fields:
               self.set_xcomargs_dependencies()
   ```
   Why? We don't want to do any custom action (including resolving upstream) during execution. And it may happen that someone will assign/reassign value to template field attribute in `execute()`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-635400916


   > ```python
   > with DAG(dag_id='xcomargs_test_3', default_args={"start_date": datetime.today()}) as dag3:
   >     op1 = DummyOperator(task_id="op1")
   >     op2 = CustomOp(task_id="op2", field=op1.output)
   > 
   > assert op1 in op2.upstream_list. # False
   > ```
   
   :interrobang:
   
   Fixing that usecase is like 90% of the case I would want to fix with this PR.
   
   Could we use the `__exit__` from the dag to fix this one case (but there are others that wouldn't be caught by that. 
   
   Given how rare metaprogramming is in Python, I think the metaclass approach gives a more complete solution, it catches all possible ways of assigning deps between a DAG.
   
   Yes, this is technically a breaking change, but I think it will affect almost no one. And I think the only "break" would be that using XComArg wouldn't work with a custom operator, unless your metaclass inherits this new metaclass. Is that correct? (i.e. the dag would still run, the user's metaclass would be the one that is used.)
   
   So I'm +1 for metaclass, if I've understood it all correctly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773


   I did a simple test:
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.utils.decorators import apply_defaults
   from scripts.perf.perf_kit import python, memory, repeat_and_time
   
   fields = [f"field{i}" for i in range(30)]
   
   
   class CustomOp(DummyOperator):
       template_fields = fields  # 30 template fields
   
       @apply_defaults
       def __init__(self, *args, **kwargs):
           super().__init__(*args, task_id=kwargs["task_id"])
           for key in kwargs:
               if key.startswith("field"):
                   setattr(self, key, kwargs[key])
   
   
   if __name__ == '__main__':
       N = 10
       OP_N = 100
   
       @repeat_and_time.timing(N)
       @repeat_and_time.repeat(N)
       def case():
           with DAG("xcomargs_test", default_args={"start_date": datetime.today()}):
               op1 = DummyOperator(task_id="op1")
               for i in range(OP_N):
                   kwargs = {k: op1.output for k in fields}
                   CustomOp(task_id=f"task_{i}", **kwargs)
   
       print("OPs: ", OP_N)
       case()
   ```
   Average time for 100 OPs in single DAG:
   - metaclass: 492.990 ms
   - setattr + if: 7072.531 ms
   - setattr: 7208.424 ms
   
   So using setattr is **14** slower than metaclass... 
   
   But to make everything works smooth we can mix those two approaches:
   ```python
   class BaseOperatorMeta(type):
       def __call__(cls, *args, **kwargs):
           obj: BaseOperator = type.__call__(cls, *args, **kwargs)
           # Set upstream task defined by XComArgs passed to template fields of an operator
           obj.set_xcomargs_dependencies()
           obj._instantiated = True
           return obj
   
   
   class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta):
       _instantiated = False
   
       def __setattr__(self, key, value):
           super().__setattr__(key, value)
           if self._instantiated and key in self.template_fields:
               self.set_xcomargs_dependencies()
   ```
   
   this gives the following result **471.381 ms**


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422861355



##########
File path: airflow/models/baseoperator.py
##########
@@ -672,10 +672,20 @@ def _set_xcomargs_dependencies(self) -> None:
 
         """
         from airflow.models.xcom_arg import XComArg
-        for field in self.template_fields:
-            arg = getattr(self, field)
+
+        def apply_set_upstream(arg: Any):
             if isinstance(arg, XComArg):

Review comment:
       > it can be that something will fail and this feature will become a "blocker" to update to the new version
   
   Can you provide an example? We perform an action only when we encounter `XComArg` so `self.set_upstream` should work. If it doesn't work then it means that our implementation is corrupted.  
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773


   I did a simple test:
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.utils.decorators import apply_defaults
   from scripts.perf.perf_kit import python, memory, repeat_and_time
   
   fields = [f"field{i}" for i in range(30)]
   
   
   class CustomOp(DummyOperator):
       template_fields = fields
   
       @apply_defaults
       def __init__(self, *args, **kwargs):
           super().__init__(*args, task_id=kwargs["task_id"])
           for key in kwargs:
               if key.startswith("field"):
                   setattr(self, key, kwargs[key])
   
   
   if __name__ == '__main__':
       N = 10
       DAG_N = 100
   
       @repeat_and_time.timing(N)
       @repeat_and_time.repeat(N)
       def case():
           with DAG("xcomargs_test", default_args={"start_date": datetime.today()}):
               op1 = DummyOperator(task_id="op1")
               for i in range(DAG_N):
                   kwargs = {k: op1.output for k in fields}
                   CustomOp(task_id=f"task_{i}", **kwargs)
   
       print("DAGs: ", DAG_N)
       case()
   ```
   Results for 100 DAGs:
   - metaclass: 492.990 ms
   - setattr + if: 7072.531 ms
   - setattr: 7208.424 ms
   
   So using setattr is **14** slower than metaclass... 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773


   I did some simple test:
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.utils.decorators import apply_defaults
   from scripts.perf.perf_kit import python, memory, repeat_and_time
   
   fields = [f"field{i}" for i in range(30)]
   
   
   class CustomOp(DummyOperator):
       template_fields = fields
   
       @apply_defaults
       def __init__(self, *args, **kwargs):
           super().__init__(*args, task_id=kwargs["task_id"])
           for key in kwargs:
               if key.startswith("field"):
                   setattr(self, key, kwargs[key])
   
   
   if __name__ == '__main__':
       N = 10
       DAG_N = 100
   
       @repeat_and_time.timing(N)
       @repeat_and_time.repeat(N)
       def case():
           with DAG("xcomargs_test", default_args={"start_date": datetime.today()}):
               op1 = DummyOperator(task_id="op1")
               for i in range(DAG_N):
                   kwargs = {k: op1.output for k in fields}
                   CustomOp(task_id=f"task_{i}", **kwargs)
   
       print("DAGs: ", DAG_N)
       case()
   ```
   Results for 100 DAGs:
   - metaclass: 492.990 ms
   - setattr + if: 7072.531 ms
   - setattr: 7208.424 ms
   
   So using setattr is **14** slower than metaclass... 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636329264


   > we could let it work when setting _any_ attribute to an XComArg, which would be nice
   
   That would mean that if we set 20 attributes in `__init__` then we run `self.set_xcomargs_dependencies` 20 times. This was the reason for the if. 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636334773


   I did a simple test:
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.utils.decorators import apply_defaults
   from scripts.perf.perf_kit import python, memory, repeat_and_time
   
   fields = [f"field{i}" for i in range(30)]
   
   
   class CustomOp(DummyOperator):
       template_fields = fields  # 30 template fields
   
       @apply_defaults
       def __init__(self, *args, **kwargs):
           super().__init__(*args, task_id=kwargs["task_id"])
           for key in kwargs:
               if key.startswith("field"):
                   setattr(self, key, kwargs[key])
   
   
   if __name__ == '__main__':
       N = 10
       OP_N = 100
   
       @repeat_and_time.timing(N)
       @repeat_and_time.repeat(N)
       def case():
           with DAG("xcomargs_test", default_args={"start_date": datetime.today()}):
               op1 = DummyOperator(task_id="op1")
               for i in range(OP_N):
                   kwargs = {k: op1.output for k in fields}
                   CustomOp(task_id=f"task_{i}", **kwargs)
   
       print("OPs: ", OP_N)
       case()
   ```
   Average time for 100 OPs in single DAG:
   - metaclass: 471.470 ms
   - setattr + if: 7072.531 ms
   - setattr: 7208.424 ms
   
   So using setattr is **14** slower than metaclass... 
   
   But to make everything works smooth we can mix those two approaches:
   ```python
   class BaseOperatorMeta(type):
       def __call__(cls, *args, **kwargs):
           obj: BaseOperator = type.__call__(cls, *args, **kwargs)
           # Set upstream task defined by XComArgs passed to template fields of an operator
           obj.set_xcomargs_dependencies()
           obj._instantiated = True
           return obj
   
   
   class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta):
       _instantiated = False
   
       def __setattr__(self, key, value):
           super().__setattr__(key, value)
           if self._instantiated and key in self.template_fields:
               self.set_xcomargs_dependencies()
   ```
   
   this gives the following result **471.381 ms**


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] evgenyshulman commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
evgenyshulman commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422632621



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access

Review comment:
        meta_class introduce backward compatibility problems for users who already have metaclass defined for their operator ( they will have to inherit from BaseOperatorMeta) 
    
    what happens when user creates operator without DAG? 
    1. we can fix set_upstream or we can just finalize Operator implementation when we add it to dag ( thus no metaclass is required)

##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +651,33 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def _set_xcomargs_dependencies(self) -> None:
+        """
+        Resolves upstream dependencies of a task. In this way passing an ``XComArg`
+        as value for a template field will result in creating upstream relation between
+        two tasks.
+
+        **Example**: ::
+
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(..., html_content=generate_content.output)
+
+            # This is equivalent to
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(
+                    ..., html_content="{{ task_instance.xcom_pull('generate_content') }}"
+                )
+                generate_content >> send_email
+
+        """
+        from airflow.models.xcom_arg import XComArg
+        for field in self.template_fields:
+            arg = getattr(self, field)
+            if isinstance(arg, XComArg):
+                self.set_upstream(arg.operator)

Review comment:
       what about pythonOperator? usually, template fields are traversed with `operator.render_template` call, we probably should do the same here

##########
File path: tests/models/test_baseoperator.py
##########
@@ -262,6 +263,33 @@ def test_email_on_actions(self):
         assert test_task.email_on_retry is False
         assert test_task.email_on_failure is True
 
+    def test_upstream_is_set_when_template_field_is_xcomarg(self):
+        class CustomOpSuperBefore(DummyOperator):
+            template_fields = ("field",)
+
+            @apply_defaults
+            def __init__(self, field, *args, **kwargs):
+                super().__init__(*args, **kwargs)
+                self.field = field
+
+        class CustomOpSuperAfter(DummyOperator):
+            template_fields = ("field",)
+
+            @apply_defaults
+            def __init__(self, field, *args, **kwargs):
+                self.field = field
+                super().__init__(*args, **kwargs)
+
+        with DAG("test_dag", default_args={"start_date": datetime.today()}):
+            op1 = DummyOperator(task_id="op1")
+            op2 = CustomOpSuperBefore(task_id="op2", field=op1.output)
+            op3 = CustomOpSuperAfter(task_id="op3", field=op1.output)
+
+        assert op1 in op2.upstream_list
+        assert op1 in op3.upstream_list
+        assert op2 in op1.downstream_list
+        assert op3 in op1.downstream_list
+

Review comment:
       we need checks on "operator defined outside of DAG",  pythonOperator (op_kwargs) usage 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r427252150



##########
File path: airflow/models/dag.py
##########
@@ -1468,6 +1468,10 @@ def create_dagrun(self,
         :param session: database session
         :type session: sqlalchemy.orm.session.Session
         """
+        # Resolve relationship between task set by XComArgs
+        for task in self.tasks:
+            task.set_xcomargs_dependencies()

Review comment:
       I removed this code. Everything works fine (dag rendering in web UI, running dags, backfill etc). The only problem is that code like this will not work anymore:
   ```
   if __name__ == '__main__':
       dag.run()
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636327033


   > > Could we use the **exit** from the dag to fix this one case (but there are others that wouldn't be caught by that.
   > 
   > Exactly
   > 
   > > Given how rare metaprogramming is in Python, I think the metaclass approach gives a more complete solution, it catches all possible ways of assigning deps between a DAG.
   > 
   > I agree, in our codebase, there's only one operator that will need to be fixed.
   > 
   > > Yes, this is technically a breaking change, but I think it will affect almost no one. And I think the only "break" would be that using XComArg wouldn't work with a custom operator, unless your metaclass inherits this new metaclass. Is that correct? (i.e. the dag would still run, the user's metaclass would be the one that is used.)
   > 
   > I think yes
   > 
   > > So I'm +1 for metaclass, if I've understood it all correctly.
   > 
   > Happy to bring it back!
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636777316


   > > Why? We don't want to do any custom action (including resolving upstream) during execution. And it may happen that someone will assign/reassign value to template field attribute in execute()
   > 
   > This is in fact _exactly_ what `task.render_template_fields()` does, so yes, we do want to set something like this.
   
   @ashb true, but I think we don't want to set upstreams when executing a task, right?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422638768



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access

Review comment:
       > meta_class introduce backward compatibility problems for users who already have metaclass defined for their operator ( they will have to inherit from BaseOperatorMeta)
   
   True
   
   > what happens when a user creates an operator without DAG?
   
   Well, nothing. If the question is about https://github.com/apache/airflow/pull/8805#discussion_r422633456 then AirflowException on missing DAG is raised. I've added a test for that.
   
   > 1. we can fix set_upstream
   
   Is this related to this comment https://github.com/apache/airflow/pull/8805#discussion_r422633321 ?
   
   > 2. we can just finalize Operator implementation when we add it to dag ( thus no metaclass is required)
   
   User can either provide the `dag` argument to operator or if it's `None` then we call `DagContext.get_current_dag()` . And setting DAG (via `@dag.setter`) is done during `__init__` call, so I'm not sure if I understand when the "add to dag" moment happens.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422630352



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access
+        return obj
+
+
 # pylint: disable=too-many-instance-attributes,too-many-public-methods
 @functools.total_ordering
-class BaseOperator(Operator, LoggingMixin):
+class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta):

Review comment:
       > Why metaclass instead of __init__?
   
   The `resolve` method has to work for any operator, no matter where someone calls `super().__init__(*args, **kwargs)`. In other words, `resolve` has to be called after initialization of an operator.
   
   Calling resolve in `__init__` works only when custom operator is doing:
   ```python
   class CustomOp(BaseOperator):
       template_fields = ("custom_field",)
       def __init__(self, custom_field, *args, **kwargs):
           self.custom_field = custom_field
           super().__init__(*args, **kwargs)  # resvole happens here
   ```
   but not in case of (this is much more popular)
   ```python
   class CustomOp(BaseOperator):
       template_fields = ("custom_field",)
       def __init__(self, custom_field, *args, **kwargs):
           super().__init__(*args, **kwargs)  # resvole happens here
           self.custom_field = custom_field  # this is not resolved 
   ```
   
   This is also checked in tests. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636329264


   > we could let it work when setting _any_ attribute to an XComArg, which would be nice
   
   That would mean that if we set 20 attributes in `__init__` then we run `self.set_xcomargs_dependencies` 20 times. This was the reason for the if. 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r431860047



##########
File path: tests/models/test_baseoperator.py
##########
@@ -347,3 +350,67 @@ def test_lineage_composition(self):
         task4 = DummyOperator(task_id="op4", dag=dag)
         task4 > [inlet, outlet, extra]
         self.assertEqual(task4.get_outlet_defs(), [inlet, outlet, extra])
+
+
+@pytest.fixture(scope="class")
+def provide_test_dag_bag():
+    yield DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False)
+
+
+class TestXComArgsRelationsAreResolved:
+    def test_upstream_is_set_when_template_field_is_xcomarg(
+        self, provide_test_dag_bag  # pylint: disable=redefined-outer-name
+    ):

Review comment:
       Right, thanks for checking this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636769916


   > Why? We don't want to do any custom action (including resolving upstream) during execution. And it may happen that someone will assign/reassign value to template field attribute in execute()
   
   This is in fact _exactly_ what `task.render_template_fields()` does, so yes, we do want to set something like this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422651378



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access

Review comment:
       Additionally, I think we will have to introduce this metaclass now or later. Here's @mik-laj PR where he tried to address the `@apply_defaults`:
   https://github.com/apache/airflow/pull/7450
   
   One of the reasons to do that (from [devlist thread](https://lists.apache.org/thread.html/r4cf535e5dec1fbf8330f3c86031519092ca4820b382d3738413b23cd%40%3Cdev.airflow.apache.org%3E)):
   > I saw that PR today and it made me realize I've not been using apply_defaults in much of my code! Look forward to this being an automaic operation via a MetaClass, great work :) 
   
   However, this point is very valid:
   > meta_class introduce backward compatibility problems for users who already have metaclass defined for their operator ( they will have to inherit from BaseOperatorMeta)
   
   I suppose that we can add some refactor step once we have 2.0 and https://github.com/apache/airflow/issues/8765




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-638881162


   @ashb @kaxil @potiuk @mik-laj @dimberman @evgenyshulman @casassg happy to hear your opinion and move this on :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] evgenyshulman commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
evgenyshulman commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422693309



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access

Review comment:
       for this specific case (fixing apply_defaults probably should be done via metaclass) I would vote for dagbag "post" action. people can assign values not only after super().__init__ in subclass, but also in the DAG definition file as well. 
   something like 
   a = Op()
   a.b = other.c




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r427280878



##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +634,43 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def set_xcomargs_dependencies(self) -> None:

Review comment:
       Hmmm not too familiary with lineage yet.
   
   /cc @bolkedebruin if you have time to look at this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r425100679



##########
File path: airflow/models/baseoperator.py
##########
@@ -1141,7 +1178,7 @@ def set_upstream(self, task_or_task_list: Union['BaseOperator', List['BaseOperat
 
     @property
     def output(self):

Review comment:
       As far as I know you can use `op.output["your key"]` because `XComArg` implements `__getitem__`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422643152



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access

Review comment:
       We can try to resolve those dependencies when creating a `DagRun`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422630352



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access
+        return obj
+
+
 # pylint: disable=too-many-instance-attributes,too-many-public-methods
 @functools.total_ordering
-class BaseOperator(Operator, LoggingMixin):
+class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta):

Review comment:
       > Why metaclass instead of __init__?
   
   The `_set_xcomargs_dependencies` method has to work for any operator, no matter where someone calls `super().__init__(*args, **kwargs)`. In other words, `_set_xcomargs_dependencies ` has to be called after initialization of an operator.
   
   Calling resolve in `__init__` works only when custom operator is doing:
   ```python
   class CustomOp(BaseOperator):
       template_fields = ("custom_field",)
       def __init__(self, custom_field, *args, **kwargs):
           self.custom_field = custom_field
           super().__init__(*args, **kwargs)  # resvole happens here
   ```
   but not in case of (this is much more popular)
   ```python
   class CustomOp(BaseOperator):
       template_fields = ("custom_field",)
       def __init__(self, custom_field, *args, **kwargs):
           super().__init__(*args, **kwargs)  # resvole happens here
           self.custom_field = custom_field  # this is not resolved 
   ```
   
   This is also checked in tests. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] evgenyshulman commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
evgenyshulman commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422693890



##########
File path: airflow/models/baseoperator.py
##########
@@ -672,10 +672,20 @@ def _set_xcomargs_dependencies(self) -> None:
 
         """
         from airflow.models.xcom_arg import XComArg
-        for field in self.template_fields:
-            arg = getattr(self, field)
+
+        def apply_set_upstream(arg: Any):
             if isinstance(arg, XComArg):

Review comment:
       this traversing part should be very "error prune". wee are starting to traverse all user templated args with no exception (old code is affected), it can be that something will fail and this feature will become a "blocker" to update to the new version




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r440037349



##########
File path: tests/test_utils/__init__.py
##########
@@ -20,3 +20,7 @@
 AIRFLOW_MAIN_FOLDER = os.path.realpath(
     os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, os.pardir)
 )
+
+EXAMPLE_DAGS_FOLDER = os.path.join(
+    AIRFLOW_MAIN_FOLDER, "airflow", "example_dags"
+)

Review comment:
       ```suggestion
   ```
   
   unused now I think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek edited a comment on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek edited a comment on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636329264


   > we could let it work when setting _any_ attribute to an XComArg, which would be nice
   
   That would mean that if we set 20 attributes in `__init__` then we run `self.set_xcomargs_dependencies` 20 times. This was the reason for the if. 
   
   But more importanlty `self.set_xcomargs_dependencies` would be called before seeting the value for template field. For example:
   ```python
   class DummyOp(BaseOperator):
        template_fields = ("field",)
       def __init__(self, field): 
           self.x = 42  # calls self.set_xcomargs_dependencies and fails because selg has no attr fields
           self.fields = field
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r425012045



##########
File path: airflow/models/baseoperator.py
##########
@@ -1141,7 +1178,7 @@ def set_upstream(self, task_or_task_list: Union['BaseOperator', List['BaseOperat
 
     @property
     def output(self):

Review comment:
       What about other xcom values than the default `return_value` key?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r433186683



##########
File path: tests/dags/test_xcomargs_dag.py
##########
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the XComArgs."""
+from datetime import datetime
+
+from airflow import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.dates import days_ago
+from airflow.utils.decorators import apply_defaults
+
+
+class CustomOp(DummyOperator):
+    template_fields = ("field",)
+
+    @apply_defaults
+    def __init__(self, field=None, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.field = field
+
+
+args = {
+    'owner': 'airflow',
+    'start_date': days_ago(2),
+}
+
+
+def dummy(*args, **kwargs):
+    """Dummy function"""
+    return "pass"
+
+
+with DAG(dag_id='xcomargs_test_1', default_args={"start_date": datetime.today()}) as dag1:

Review comment:
       I will remove those tests. Once we know that upstream is set for DAG we do not have to test anything on DagBag or serialization level I think 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422858966



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access

Review comment:
       > I would vote for dagbag "post" action. 
   
   This would require some additional changes to make it work everywhere. For example, running `dag.run()` or `dag.create_dagrun()` don't create DagBag thus the relation will not be set. 
   
   > people can assign values not only after super().init in subclass, but also in the DAG definition file as well.
   
   Imho we should discourage that, I can't find any explanation why users should need to do that.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] casassg commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
casassg commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r431484377



##########
File path: tests/models/test_baseoperator.py
##########
@@ -347,3 +350,66 @@ def test_lineage_composition(self):
         task4 = DummyOperator(task_id="op4", dag=dag)
         task4 > [inlet, outlet, extra]
         self.assertEqual(task4.get_outlet_defs(), [inlet, outlet, extra])
+
+
+@pytest.fixture(scope="class")
+def provide_test_dag_bag():
+    yield DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False)
+
+
+class TestXComArgsRelationsAreResolved:
+    def test_upstream_is_set_when_template_field_is_xcomarg(
+        self, provide_test_dag_bag  # pylint: disable=redefined-outer-name
+    ):
+        dag: DAG = provide_test_dag_bag.get_dag("xcomargs_test_1")
+        op1, op2 = sorted(dag.tasks, key=lambda t: t.task_id)
+
+        assert op1 in op2.upstream_list
+        assert op2 in op1.downstream_list
+
+    def test_set_xcomargs_dependencies_works_recursively(
+        self, provide_test_dag_bag  # pylint: disable=redefined-outer-name
+    ):
+        dag: DAG = provide_test_dag_bag.get_dag("xcomargs_test_2")
+        op1, op2, op3, op4 = sorted(dag.tasks, key=lambda t: t.task_id)
+
+        assert op1 in op3.upstream_list
+        assert op2 in op3.upstream_list
+        assert op1 in op4.upstream_list
+        assert op2 in op4.upstream_list
+
+    def test_set_xcomargs_dependencies_works_when_set_after_init(
+        self, provide_test_dag_bag  # pylint: disable=redefined-outer-name
+    ):
+        dag: DAG = provide_test_dag_bag.get_dag("xcomargs_test_3")
+        op1, op2, _ = sorted(dag.tasks, key=lambda t: t.task_id)
+
+        assert op1 in op2.upstream_list
+
+    def test_set_xcomargs_dependencies_no_error_when_outside_dag(self):
+        class CustomOp(DummyOperator):
+            template_fields = ("field",)
+
+            @apply_defaults
+            def __init__(self, field, *args, **kwargs):
+                super().__init__(*args, **kwargs)
+                self.field = field
+
+        op1 = DummyOperator(task_id="op1")
+        CustomOp(task_id="op2", field=op1.output)
+
+    def test_set_xcomargs_dependencies_when_creating_dagbag_with_serialization(self):
+        # Persist DAG
+        dag_id = "xcomargs_test_3"
+        dagbag = DagBag(TEST_DAGS_FOLDER, include_examples=False)

Review comment:
       nit: Why not use `provide_test_dag_bag` here as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422630352



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access
+        return obj
+
+
 # pylint: disable=too-many-instance-attributes,too-many-public-methods
 @functools.total_ordering
-class BaseOperator(Operator, LoggingMixin):
+class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta):

Review comment:
       > Why metaclass instead of __init__?
   
   The `_set_xcomargs_dependencies ` method has to work for any operator, no matter where someone calls `super().__init__(*args, **kwargs)`. In other words, `resolve` has to be called after initialization of an operator.
   
   Calling resolve in `__init__` works only when custom operator is doing:
   ```python
   class CustomOp(BaseOperator):
       template_fields = ("custom_field",)
       def __init__(self, custom_field, *args, **kwargs):
           self.custom_field = custom_field
           super().__init__(*args, **kwargs)  # resvole happens here
   ```
   but not in case of (this is much more popular)
   ```python
   class CustomOp(BaseOperator):
       template_fields = ("custom_field",)
       def __init__(self, custom_field, *args, **kwargs):
           super().__init__(*args, **kwargs)  # resvole happens here
           self.custom_field = custom_field  # this is not resolved 
   ```
   
   This is also checked in tests. 

##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access
+        return obj
+
+
 # pylint: disable=too-many-instance-attributes,too-many-public-methods
 @functools.total_ordering
-class BaseOperator(Operator, LoggingMixin):
+class BaseOperator(Operator, LoggingMixin, metaclass=BaseOperatorMeta):

Review comment:
       > Why metaclass instead of __init__?
   
   The `_set_xcomargs_dependencies` method has to work for any operator, no matter where someone calls `super().__init__(*args, **kwargs)`. In other words, `resolve` has to be called after initialization of an operator.
   
   Calling resolve in `__init__` works only when custom operator is doing:
   ```python
   class CustomOp(BaseOperator):
       template_fields = ("custom_field",)
       def __init__(self, custom_field, *args, **kwargs):
           self.custom_field = custom_field
           super().__init__(*args, **kwargs)  # resvole happens here
   ```
   but not in case of (this is much more popular)
   ```python
   class CustomOp(BaseOperator):
       template_fields = ("custom_field",)
       def __init__(self, custom_field, *args, **kwargs):
           super().__init__(*args, **kwargs)  # resvole happens here
           self.custom_field = custom_field  # this is not resolved 
   ```
   
   This is also checked in tests. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] evgenyshulman commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
evgenyshulman commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422632621



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access

Review comment:
        meta_class introduce backward compatibility problems for users who already have metaclass defined for their operator ( they will have to inherit from BaseOperatorMeta) 
    
    what happens when user creates operator without DAG? 
    1. we can fix set_upstream  we can just finalize Operator implementation when we add it to dag ( thus no metaclass is required)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r425099860



##########
File path: airflow/providers/google/cloud/operators/automl.py
##########
@@ -558,7 +558,7 @@ class AutoMLTablesUpdateDatasetOperator(BaseOperator):
     :type gcp_conn_id: str
     """
 
-    template_fields = ("dataset", "update_mask", "location", "project_id")
+    template_fields = ("dataset", "update_mask", "location")

Review comment:
       I will rebase, it was fixed in other PR. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-633894857


   @ashb @evgenyshulman @casassg I think all questions were addressed 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] evgenyshulman commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
evgenyshulman commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422632621



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access

Review comment:
        meta_class introduce backward compatibility problems for users who already have metaclass defined for their operator ( they will have to inherit from BaseOperatorMeta) 
    
    what happens when a user creates an operator without DAG? 
    1. we can fix set_upstream  
    2. we can just finalize Operator implementation when we add it to dag ( thus no metaclass is required)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422858966



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access

Review comment:
       > I would vote for dagbag "post" action. 
   
   This would require some additional changes to make it work everywhere. For example, running `dag.run()` or `dag.create_dagrun()` don't create DagBag thus the relation will not be set. 
   
   > people can assign values not only after super().init in subclass, but also in the DAG definition file as well.
   
   I am not sure I got it. To use `a.output` you have to first define `a` so there's where we resolve the upstream for the `a`. Using "functional" approach, users will have to define upstream tasks of an `a` task before he defines the `a` otherwise they will not be able to use `output` parameter. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-636784301


   > > > Why? We don't want to do any custom action (including resolving upstream) during execution. And it may happen that someone will assign/reassign value to template field attribute in execute()
   > > 
   > > 
   > > This is in fact _exactly_ what `task.render_template_fields()` does, so yes, we do want to set something like this.
   > 
   > @ashb true, but I think we don't want to set upstreams when executing a task, right?
   
   Yes, correct, at execution time we don't want to change anything (the upstreams would already be set from creating the task anyway)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r425009815



##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +634,43 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def set_xcomargs_dependencies(self) -> None:

Review comment:
       Does this function set lineage/inlets/outlets too? If not it should please.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r431580206



##########
File path: tests/models/test_baseoperator.py
##########
@@ -347,3 +350,66 @@ def test_lineage_composition(self):
         task4 = DummyOperator(task_id="op4", dag=dag)
         task4 > [inlet, outlet, extra]
         self.assertEqual(task4.get_outlet_defs(), [inlet, outlet, extra])
+
+
+@pytest.fixture(scope="class")
+def provide_test_dag_bag():
+    yield DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False)
+
+
+class TestXComArgsRelationsAreResolved:
+    def test_upstream_is_set_when_template_field_is_xcomarg(
+        self, provide_test_dag_bag  # pylint: disable=redefined-outer-name
+    ):
+        dag: DAG = provide_test_dag_bag.get_dag("xcomargs_test_1")
+        op1, op2 = sorted(dag.tasks, key=lambda t: t.task_id)
+
+        assert op1 in op2.upstream_list
+        assert op2 in op1.downstream_list
+
+    def test_set_xcomargs_dependencies_works_recursively(
+        self, provide_test_dag_bag  # pylint: disable=redefined-outer-name
+    ):
+        dag: DAG = provide_test_dag_bag.get_dag("xcomargs_test_2")
+        op1, op2, op3, op4 = sorted(dag.tasks, key=lambda t: t.task_id)
+
+        assert op1 in op3.upstream_list
+        assert op2 in op3.upstream_list
+        assert op1 in op4.upstream_list
+        assert op2 in op4.upstream_list
+
+    def test_set_xcomargs_dependencies_works_when_set_after_init(
+        self, provide_test_dag_bag  # pylint: disable=redefined-outer-name
+    ):
+        dag: DAG = provide_test_dag_bag.get_dag("xcomargs_test_3")
+        op1, op2, _ = sorted(dag.tasks, key=lambda t: t.task_id)
+
+        assert op1 in op2.upstream_list
+
+    def test_set_xcomargs_dependencies_no_error_when_outside_dag(self):
+        class CustomOp(DummyOperator):
+            template_fields = ("field",)
+
+            @apply_defaults
+            def __init__(self, field, *args, **kwargs):
+                super().__init__(*args, **kwargs)
+                self.field = field
+
+        op1 = DummyOperator(task_id="op1")
+        CustomOp(task_id="op2", field=op1.output)
+
+    def test_set_xcomargs_dependencies_when_creating_dagbag_with_serialization(self):
+        # Persist DAG
+        dag_id = "xcomargs_test_3"
+        dagbag = DagBag(TEST_DAGS_FOLDER, include_examples=False)

Review comment:
       I want a new fresh DagBag and `provide_test_dag_bag` has a `class` scope.
   Edit: now I think about it I see no reason for that. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422648227



##########
File path: tests/models/test_baseoperator.py
##########
@@ -262,6 +263,33 @@ def test_email_on_actions(self):
         assert test_task.email_on_retry is False
         assert test_task.email_on_failure is True
 
+    def test_upstream_is_set_when_template_field_is_xcomarg(self):
+        class CustomOpSuperBefore(DummyOperator):
+            template_fields = ("field",)
+
+            @apply_defaults
+            def __init__(self, field, *args, **kwargs):
+                super().__init__(*args, **kwargs)
+                self.field = field
+
+        class CustomOpSuperAfter(DummyOperator):
+            template_fields = ("field",)
+
+            @apply_defaults
+            def __init__(self, field, *args, **kwargs):
+                self.field = field
+                super().__init__(*args, **kwargs)
+
+        with DAG("test_dag", default_args={"start_date": datetime.today()}):
+            op1 = DummyOperator(task_id="op1")
+            op2 = CustomOpSuperBefore(task_id="op2", field=op1.output)
+            op3 = CustomOpSuperAfter(task_id="op3", field=op1.output)
+
+        assert op1 in op2.upstream_list
+        assert op1 in op3.upstream_list
+        assert op2 in op1.downstream_list
+        assert op3 in op1.downstream_list
+

Review comment:
       Added

##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +651,33 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def _set_xcomargs_dependencies(self) -> None:
+        """
+        Resolves upstream dependencies of a task. In this way passing an ``XComArg`
+        as value for a template field will result in creating upstream relation between
+        two tasks.
+
+        **Example**: ::
+
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(..., html_content=generate_content.output)
+
+            # This is equivalent to
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(
+                    ..., html_content="{{ task_instance.xcom_pull('generate_content') }}"
+                )
+                generate_content >> send_email
+
+        """
+        from airflow.models.xcom_arg import XComArg
+        for field in self.template_fields:
+            arg = getattr(self, field)
+            if isinstance(arg, XComArg):
+                self.set_upstream(arg.operator)

Review comment:
       Added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r440030963



##########
File path: airflow/models/baseoperator.py
##########
@@ -633,6 +672,55 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def prepare_for_execution(self) -> "BaseOperator":
+        """
+        Lock task for execution to disable custom action in __setattr__ and
+        returns a copy of the task
+        """
+        self._lock_for_execution = True
+        return copy.copy(self)

Review comment:
       ```suggestion
           other = copy.copy(self)
           other._lock_for_execution = True
           return other
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r431580206



##########
File path: tests/models/test_baseoperator.py
##########
@@ -347,3 +350,66 @@ def test_lineage_composition(self):
         task4 = DummyOperator(task_id="op4", dag=dag)
         task4 > [inlet, outlet, extra]
         self.assertEqual(task4.get_outlet_defs(), [inlet, outlet, extra])
+
+
+@pytest.fixture(scope="class")
+def provide_test_dag_bag():
+    yield DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False)
+
+
+class TestXComArgsRelationsAreResolved:
+    def test_upstream_is_set_when_template_field_is_xcomarg(
+        self, provide_test_dag_bag  # pylint: disable=redefined-outer-name
+    ):
+        dag: DAG = provide_test_dag_bag.get_dag("xcomargs_test_1")
+        op1, op2 = sorted(dag.tasks, key=lambda t: t.task_id)
+
+        assert op1 in op2.upstream_list
+        assert op2 in op1.downstream_list
+
+    def test_set_xcomargs_dependencies_works_recursively(
+        self, provide_test_dag_bag  # pylint: disable=redefined-outer-name
+    ):
+        dag: DAG = provide_test_dag_bag.get_dag("xcomargs_test_2")
+        op1, op2, op3, op4 = sorted(dag.tasks, key=lambda t: t.task_id)
+
+        assert op1 in op3.upstream_list
+        assert op2 in op3.upstream_list
+        assert op1 in op4.upstream_list
+        assert op2 in op4.upstream_list
+
+    def test_set_xcomargs_dependencies_works_when_set_after_init(
+        self, provide_test_dag_bag  # pylint: disable=redefined-outer-name
+    ):
+        dag: DAG = provide_test_dag_bag.get_dag("xcomargs_test_3")
+        op1, op2, _ = sorted(dag.tasks, key=lambda t: t.task_id)
+
+        assert op1 in op2.upstream_list
+
+    def test_set_xcomargs_dependencies_no_error_when_outside_dag(self):
+        class CustomOp(DummyOperator):
+            template_fields = ("field",)
+
+            @apply_defaults
+            def __init__(self, field, *args, **kwargs):
+                super().__init__(*args, **kwargs)
+                self.field = field
+
+        op1 = DummyOperator(task_id="op1")
+        CustomOp(task_id="op2", field=op1.output)
+
+    def test_set_xcomargs_dependencies_when_creating_dagbag_with_serialization(self):
+        # Persist DAG
+        dag_id = "xcomargs_test_3"
+        dagbag = DagBag(TEST_DAGS_FOLDER, include_examples=False)

Review comment:
       I want a new fresh DagBag and `provide_test_dag_bag` has a `class` scope 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#issuecomment-630728406


   @ash @kaxil @evgenyshulman are we still sure that this way it's better than having a metaclass? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r427293679



##########
File path: airflow/models/baseoperator.py
##########
@@ -634,6 +650,43 @@ def deps(self) -> Set[BaseTIDep]:
             NotPreviouslySkippedDep(),
         }
 
+    def _set_xcomargs_dependencies(self) -> None:
+        """
+        Resolves upstream dependencies of a task. In this way passing an ``XComArg``
+        as value for a template field will result in creating upstream relation between
+        two tasks.
+
+        **Example**: ::
+
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(..., html_content=generate_content.output)
+
+            # This is equivalent to
+            with DAG(...):
+                generate_content = GenerateContentOperator(task_id="generate_content")
+                send_email = EmailOperator(
+                    ..., html_content="{{ task_instance.xcom_pull('generate_content') }}"
+                )
+                generate_content >> send_email
+
+        """
+        from airflow.models.xcom_arg import XComArg
+
+        def apply_set_upstream(arg: Any):
+            if isinstance(arg, XComArg):
+                self.set_upstream(arg.operator)
+            elif isinstance(arg, (tuple, set, list)):
+                for elem in arg:
+                    apply_set_upstream(elem)
+            elif isinstance(arg, dict):

Review comment:
       @ashb sorry, I thought I did this last week. Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r433171502



##########
File path: airflow/models/taskinstance.py
##########
@@ -968,6 +968,7 @@ def _run_raw_task(
                 context = self.get_template_context()
 
                 task_copy = copy.copy(task)
+                task_copy.lock_for_execution()

Review comment:
       Awesome idea! I was thinking about adding this lock somewhere but nothing came to my mind




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8805:
URL: https://github.com/apache/airflow/pull/8805#discussion_r422651378



##########
File path: airflow/models/baseoperator.py
##########
@@ -60,9 +60,25 @@
 ScheduleInterval = Union[str, timedelta, relativedelta]
 
 
+class BaseOperatorMeta(type):
+    """
+    Base metaclass of BaseOperator.
+    """
+    def __call__(cls, *args, **kwargs):
+        """
+        Called when you call BaseOperator(). In this way we are able to perform an action
+        after initializing an operator no matter where  the ``super().__init__`` is called
+        (before or after assign of new attributes in a custom operator).
+        """
+        obj = type.__call__(cls, *args, **kwargs)
+        # Set upstream task defined by XComArgs passed to template fields of an operator
+        obj._set_xcomargs_dependencies()  # pylint: disable=protected-access

Review comment:
       > 2. we can just finalize Operator implementation when we add it to dag ( thus no metaclass is required)
   
   User can either provide the `dag` argument to operator or if it's `None` then we call `DagContext.get_current_dag()` . And setting DAG (via `@dag.setter`) is done during `__init__` call, so I'm not sure if I understand when the "add to dag" moment happens.
   
   Additionally, I think we will have to introduce this metaclass now or later. Here's @mik-laj PR where he tried to address the `@apply_defaults`:
   https://github.com/apache/airflow/pull/7450
   
   One of the reasons to do that (from [devlist thread](https://lists.apache.org/thread.html/r4cf535e5dec1fbf8330f3c86031519092ca4820b382d3738413b23cd%40%3Cdev.airflow.apache.org%3E)):
   > I saw that PR today and it made me realize I've not been using apply_defaults in much of my code! Look forward to this being an automaic operation via a MetaClass, great work :) 
   
   However, this point is very valid:
   > meta_class introduce backward compatibility problems for users who already have metaclass defined for their operator ( they will have to inherit from BaseOperatorMeta)
   
   I suppose that we can add some refactor step once we have 2.0 and https://github.com/apache/airflow/issues/8765 . 
   
   Of course we can try to add this in `DagBag.bag_dag` (like policy):
   https://github.com/apache/airflow/blob/e1cc17e84856bd121724c634cb1809581c9c97db/airflow/models/dagbag.py#L322-L323
   or in `DAG.create_dagrun` but this in my opinion is not the best approach. Happy to discuss @evgenyshulman @potiuk @kaxil 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org