You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/07 10:27:57 UTC

[GitHub] [airflow] ashb opened a new pull request #20743: Serilze mapped tasks and task groups

ashb opened a new pull request #20743:
URL: https://github.com/apache/airflow/pull/20743


   This is needed for the scheduler to be able to expand the task instances at runtime


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r785706252



##########
File path: airflow/models/baseoperator.py
##########
@@ -207,6 +207,7 @@ def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any:
 
             result = func(self, **kwargs, default_args=default_args)
             # Store the args passed to init -- we need them to support task.map serialzation!
+            kwargs.pop('task_id', None)

Review comment:
       Hmm shouldn’t this be fixed on `partial_kwargs` instead then? I’m surprised this doesn’t cause problems elsewhere. Or is `task_id` being present here wrong in the first place and should be generally fixed? (In that case we should add a comment.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786144848



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -653,10 +677,14 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
                 v = cls._deserialize(v)
             # else use v as it is
 
-            setattr(op, k, v)
+            if hasattr(op, k) and isinstance(v, set):
+                getattr(op, k).update(v)

Review comment:
       Changed this (at tested that we still correctly inflate the old name correctly)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r781162445



##########
File path: airflow/models/baseoperator.py
##########
@@ -1634,7 +1635,7 @@ def map(self, **kwargs) -> "MappedOperator":
 
 
 def _validate_kwarg_names_for_mapping(cls: Type[BaseOperator], func_name: str, value: Dict[str, Any]):
-    if isinstance(str, cls):
+    if isinstance(cls, str):

Review comment:
       Yes, it should be -- I corrected that type annotation on the MappedOperator but not here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r784799450



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -586,9 +604,24 @@ def serialize_operator(cls, op: BaseOperator) -> Dict[str, Any]:
         return serialize_op
 
     @classmethod
-    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
+    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> Union[BaseOperator, MappedOperator]:
         """Deserializes an operator from a JSON object."""
-        op = SerializedBaseOperator(task_id=encoded_op['task_id'])
+        op: Union[BaseOperator, MappedOperator]
+        # Check if it's a mapped operator
+        if "mapped_kwargs" in encoded_op:
+            op = MappedOperator(
+                task_id=encoded_op['task_id'],
+                dag=None,
+                operator_class='.'.join(filter(None, (encoded_op['_task_module'], encoded_op['_task_type']))),
+                # These are all re-set later
+                partial_kwargs={},

Review comment:
       No, cos in all other situations where this is constructor we need to pass a value. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r784797652



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -586,9 +604,24 @@ def serialize_operator(cls, op: BaseOperator) -> Dict[str, Any]:
         return serialize_op
 
     @classmethod
-    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
+    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> Union[BaseOperator, MappedOperator]:
         """Deserializes an operator from a JSON object."""
-        op = SerializedBaseOperator(task_id=encoded_op['task_id'])
+        op: Union[BaseOperator, MappedOperator]
+        # Check if it's a mapped operator
+        if "mapped_kwargs" in encoded_op:
+            op = MappedOperator(
+                task_id=encoded_op['task_id'],
+                dag=None,
+                operator_class='.'.join(filter(None, (encoded_op['_task_module'], encoded_op['_task_type']))),

Review comment:
       Why not just the following?
   ```suggestion
                   operator_class=f'{encoded_op['_task_module']}.{encoded_op['_task_type']}',
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r784816104



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -586,9 +604,24 @@ def serialize_operator(cls, op: BaseOperator) -> Dict[str, Any]:
         return serialize_op
 
     @classmethod
-    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
+    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> Union[BaseOperator, MappedOperator]:
         """Deserializes an operator from a JSON object."""
-        op = SerializedBaseOperator(task_id=encoded_op['task_id'])
+        op: Union[BaseOperator, MappedOperator]
+        # Check if it's a mapped operator
+        if "mapped_kwargs" in encoded_op:

Review comment:
       🤦🏻 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786179940



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -653,10 +677,14 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
                 v = cls._deserialize(v)
             # else use v as it is
 
-            setattr(op, k, v)
+            if hasattr(op, k) and isinstance(v, set):
+                getattr(op, k).update(v)

Review comment:
       Which change? It's now this
   
   ```
               if k == "_downstream_task_ids":
                   # Upgrade from old format/name
                   k = "downstream_task_ids"
   ```
   
   And then the set-except-get-update has done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786183131



##########
File path: airflow/models/baseoperator.py
##########
@@ -207,6 +207,7 @@ def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any:
 
             result = func(self, **kwargs, default_args=default_args)
             # Store the args passed to init -- we need them to support task.map serialzation!
+            kwargs.pop('task_id', None)

Review comment:
       Done in https://github.com/apache/airflow/pull/20743/commits/9439378183cb4da70e33e9c6f04d2d52924709a5 -- and a bit of a justification/reason in the commit message




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r781930519



##########
File path: airflow/models/baseoperator.py
##########
@@ -207,6 +207,7 @@ def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any:
 
             result = func(self, **kwargs, default_args=default_args)
             # Store the args passed to init -- we need them to support task.map serialzation!
+            kwargs.pop('task_id', None)

Review comment:
       I guess my question is more like why do we need to treat `task_id` specially (and why we didn’t need to previously, but only when we add operator mapping)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r780892938



##########
File path: airflow/models/baseoperator.py
##########
@@ -1634,7 +1635,7 @@ def map(self, **kwargs) -> "MappedOperator":
 
 
 def _validate_kwarg_names_for_mapping(cls: Type[BaseOperator], func_name: str, value: Dict[str, Any]):
-    if isinstance(str, cls):
+    if isinstance(cls, str):

Review comment:
       LOL how did this escape review




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r781930519



##########
File path: airflow/models/baseoperator.py
##########
@@ -207,6 +207,7 @@ def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any:
 
             result = func(self, **kwargs, default_args=default_args)
             # Store the args passed to init -- we need them to support task.map serialzation!
+            kwargs.pop('task_id', None)

Review comment:
       I guess my question is more like why do we need to treat `task_id` specially (and why we didn’t need to previous)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786181173



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -653,10 +677,14 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
                 v = cls._deserialize(v)
             # else use v as it is
 
-            setattr(op, k, v)
+            if hasattr(op, k) and isinstance(v, set):
+                getattr(op, k).update(v)

Review comment:
       Around here: https://github.com/apache/airflow/pull/20743/files#diff-807ca0a4fd53aeb41166621c9842b0f89b7931fc64e9a60befa36c776db45efaL661-L662




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#issuecomment-1014769681


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r784796827



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -586,9 +604,24 @@ def serialize_operator(cls, op: BaseOperator) -> Dict[str, Any]:
         return serialize_op
 
     @classmethod
-    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
+    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> Union[BaseOperator, MappedOperator]:
         """Deserializes an operator from a JSON object."""
-        op = SerializedBaseOperator(task_id=encoded_op['task_id'])
+        op: Union[BaseOperator, MappedOperator]
+        # Check if it's a mapped operator
+        if "mapped_kwargs" in encoded_op:

Review comment:
       Probably the following might be a better condition ?
   ```suggestion
           if encoded_op.get("_is_mapped"):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r783404688



##########
File path: airflow/models/baseoperator.py
##########
@@ -207,6 +207,7 @@ def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any:
 
             result = func(self, **kwargs, default_args=default_args)
             # Store the args passed to init -- we need them to support task.map serialzation!
+            kwargs.pop('task_id', None)

Review comment:
       Task ID was getting stored in partial_kwargs which I hadn't noticed until I looked at the serialized representation of a MappedOperator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r780938792



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -623,7 +647,7 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
 
         for k, v in encoded_op.items():
 
-            if k == "_downstream_task_ids":
+            if k in {"_downstream_task_ids", "downstream_task_ids"}:

Review comment:
       Maybe we should rename `downstream_task_ids` to `_downstream_task_ids` instead? This feels a bit awkward.

##########
File path: airflow/models/baseoperator.py
##########
@@ -1741,6 +1788,28 @@ def leaves(self) -> List["MappedOperator"]:
     def has_dag(self):
         return self.dag is not None
 
+    @property
+    def inherits_from_dummy_operator(self):
+        """Used to determine if an Operator is inherited from DummyOperator"""
+        # This looks like `isinstance(self, DummyOperator) would work, but this also
+        # needs to cope when `self` is a Serialized instance of a DummyOperator or one
+        # of its sub-classes (which don't inherit from anything but BaseOperator).
+        return getattr(self, '_is_dummy', False)

Review comment:
       `_is_dummy` has a default, so when can it be unset?

##########
File path: airflow/models/baseoperator.py
##########
@@ -207,6 +207,7 @@ def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any:
 
             result = func(self, **kwargs, default_args=default_args)
             # Store the args passed to init -- we need them to support task.map serialzation!
+            kwargs.pop('task_id', None)

Review comment:
       I don’t get what this implies.

##########
File path: airflow/models/baseoperator.py
##########
@@ -1663,24 +1664,64 @@ def _validate_kwarg_names_for_mapping(cls: Type[BaseOperator], func_name: str, v
         raise TypeError(f'{cls.__name__}.{func_name} got unexpected keyword arguments {names}')
 
 
-@attr.define(kw_only=True)
+def _MappedOperator_minimal_repr(cls, fields):
+    results = []
+    fields = iter(fields)
+    for field in fields:
+        results.append(field)
+        if field.name == "dag":
+            # Everything after 'dag' attribute is exluced form repr
+            break
+
+    for field in fields:
+        results.append(field.evolve(repr=False))
+    return results

Review comment:
       This feels much too magical to me 🙁 Why not add `repr=False` explicitly instead, or implement a custom `__repr__`?

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -653,10 +677,14 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
                 v = cls._deserialize(v)
             # else use v as it is
 
-            setattr(op, k, v)
+            if hasattr(op, k) and isinstance(v, set):
+                getattr(op, k).update(v)
+            else:
+                setattr(op, k, v)
 
         for k in op.get_serialized_fields() - encoded_op.keys() - cls._CONSTRUCTOR_PARAMS.keys():
-            setattr(op, k, None)
+            if not hasattr(op, k):
+                setattr(op, k, None)

Review comment:
       Same question for this.

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -653,10 +677,14 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
                 v = cls._deserialize(v)
             # else use v as it is
 
-            setattr(op, k, v)
+            if hasattr(op, k) and isinstance(v, set):
+                getattr(op, k).update(v)

Review comment:
       What does this cover? Too hacky IMO.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r784788806



##########
File path: airflow/models/baseoperator.py
##########
@@ -1741,6 +1788,28 @@ def leaves(self) -> List["MappedOperator"]:
     def has_dag(self):
         return self.dag is not None
 
+    @property
+    def inherits_from_dummy_operator(self):
+        """Used to determine if an Operator is inherited from DummyOperator"""
+        # This looks like `isinstance(self, DummyOperator) would work, but this also
+        # needs to cope when `self` is a Serialized instance of a DummyOperator or one
+        # of its sub-classes (which don't inherit from anything but BaseOperator).
+        return getattr(self, '_is_dummy', False)

Review comment:
       `SerializedBaseOperator` inherits from BaseOperator too so `_is_dummy` should be set 

##########
File path: airflow/models/baseoperator.py
##########
@@ -1741,6 +1788,28 @@ def leaves(self) -> List["MappedOperator"]:
     def has_dag(self):
         return self.dag is not None
 
+    @property
+    def inherits_from_dummy_operator(self):
+        """Used to determine if an Operator is inherited from DummyOperator"""
+        # This looks like `isinstance(self, DummyOperator) would work, but this also
+        # needs to cope when `self` is a Serialized instance of a DummyOperator or one
+        # of its sub-classes (which don't inherit from anything but BaseOperator).
+        return getattr(self, '_is_dummy', False)

Review comment:
       `SerializedBaseOperator` inherits from BaseOperator too so `_is_dummy` should be already set 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r784799141



##########
File path: airflow/models/baseoperator.py
##########
@@ -1672,20 +1677,52 @@ def _validate_kwarg_names_for_mapping(cls: Type[BaseOperator], func_name: str, v
 class MappedOperator(DAGNode):
     """Object representing a mapped operator in a DAG"""
 
-    operator_class: Type[BaseOperator] = attr.ib(repr=lambda c: c.__name__)
+    @staticmethod
+    def _operator_class_repr(val):
+        # Can be a string if we are de-serialized
+        if isinstance(val, str):
+            return val.rsplit('.', 1)[-1]
+        return val.__name__
+
+    def __repr__(self) -> str:
+        return (
+            'MappedOperator(operator_class={self._operator_class_repr(self.operator_class)}, '
+            + 'task_id={self.task_id!r}, partial_kwargs={self.partial_kwargs!r}, '
+            + 'mapped_kwargs={self.mapped_kwargs!r}, dag={self.dag})'

Review comment:
       Whoops!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786165206



##########
File path: airflow/models/baseoperator.py
##########
@@ -207,6 +207,7 @@ def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any:
 
             result = func(self, **kwargs, default_args=default_args)
             # Store the args passed to init -- we need them to support task.map serialzation!
+            kwargs.pop('task_id', None)

Review comment:
       The “other” one looks better to me, it feels like we are intentionally treating `task_id` different from all other arguments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r781158866



##########
File path: airflow/models/baseoperator.py
##########
@@ -1663,24 +1664,64 @@ def _validate_kwarg_names_for_mapping(cls: Type[BaseOperator], func_name: str, v
         raise TypeError(f'{cls.__name__}.{func_name} got unexpected keyword arguments {names}')
 
 
-@attr.define(kw_only=True)
+def _MappedOperator_minimal_repr(cls, fields):
+    results = []
+    fields = iter(fields)
+    for field in fields:
+        results.append(field)
+        if field.name == "dag":
+            # Everything after 'dag' attribute is exluced form repr
+            break
+
+    for field in fields:
+        results.append(field.evolve(repr=False))
+    return results

Review comment:
       Fair comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r784758697



##########
File path: airflow/models/baseoperator.py
##########
@@ -1672,20 +1677,52 @@ def _validate_kwarg_names_for_mapping(cls: Type[BaseOperator], func_name: str, v
 class MappedOperator(DAGNode):
     """Object representing a mapped operator in a DAG"""
 
-    operator_class: Type[BaseOperator] = attr.ib(repr=lambda c: c.__name__)
+    @staticmethod
+    def _operator_class_repr(val):
+        # Can be a string if we are de-serialized
+        if isinstance(val, str):
+            return val.rsplit('.', 1)[-1]
+        return val.__name__
+
+    def __repr__(self) -> str:
+        return (
+            'MappedOperator(operator_class={self._operator_class_repr(self.operator_class)}, '
+            + 'task_id={self.task_id!r}, partial_kwargs={self.partial_kwargs!r}, '
+            + 'mapped_kwargs={self.mapped_kwargs!r}, dag={self.dag})'

Review comment:
       ```suggestion
               f'MappedOperator(operator_class={self._operator_class_repr(self.operator_class)}, '
               + f'task_id={self.task_id!r}, partial_kwargs={self.partial_kwargs!r}, '
               + f'mapped_kwargs={self.mapped_kwargs!r}, dag={self.dag})'
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786202050



##########
File path: airflow/models/baseoperator.py
##########
@@ -136,6 +136,7 @@ def _apply_defaults(cls, func: T) -> T:
         non_optional_args = {
             name for (name, param) in non_variadic_params.items() if param.default == param.empty
         }
+        non_optional_args -= {'task_id'}

Review comment:
       ```suggestion
           non_optional_args = {
               name
               for name, param in non_variadic_params.items()
               if param.default == param.empty
               and name != "task_id"
           }
   ```
   
   should be the same?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786202050



##########
File path: airflow/models/baseoperator.py
##########
@@ -136,6 +136,7 @@ def _apply_defaults(cls, func: T) -> T:
         non_optional_args = {
             name for (name, param) in non_variadic_params.items() if param.default == param.empty
         }
+        non_optional_args -= {'task_id'}

Review comment:
       ```suggestion
           non_optional_args = {
               name
               for (name, param) in non_variadic_params.items()
               if param.default == param.empty
               and name != "task_id"
           }
   ```
   
   should be the same?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786430565



##########
File path: airflow/models/baseoperator.py
##########
@@ -207,6 +207,7 @@ def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any:
 
             result = func(self, **kwargs, default_args=default_args)
             # Store the args passed to init -- we need them to support task.map serialzation!
+            kwargs.pop('task_id', None)

Review comment:
       Turns out this has an unfortunate side effect:
   
   ```pycon
   >>> FooOperator()
   Traceback (most recent call last):
     File "<stdin>", line 1, in <module>
   TypeError: apply_defaults() missing 1 required keyword-only argument: 'task_id'
   ```
   
   and error message seems to be hard-coded in the interpreter and cannot be patched. I’m trying yet another approach (see new commits)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r783623735



##########
File path: airflow/models/baseoperator.py
##########
@@ -207,6 +207,7 @@ def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any:
 
             result = func(self, **kwargs, default_args=default_args)
             # Store the args passed to init -- we need them to support task.map serialzation!
+            kwargs.pop('task_id', None)

Review comment:
       How about storing `task_id` separately instead and not in `partial_kwargs`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786138557



##########
File path: airflow/models/baseoperator.py
##########
@@ -207,6 +207,7 @@ def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any:
 
             result = func(self, **kwargs, default_args=default_args)
             # Store the args passed to init -- we need them to support task.map serialzation!
+            kwargs.pop('task_id', None)

Review comment:
       This is dealing with it in partial_kwargs -- 
   
   ```
               # Store the args passed to init -- we need them to support task.map serialzation!
               kwargs.pop('task_id', None)
               self._BaseOperator__init_kwargs.update(kwargs)  # type: ignore
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#issuecomment-1015258291


   I'm going to work on adding `--map-index` to task execution side.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r785711324



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -653,10 +677,14 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
                 v = cls._deserialize(v)
             # else use v as it is
 
-            setattr(op, k, v)
+            if hasattr(op, k) and isinstance(v, set):
+                getattr(op, k).update(v)

Review comment:
       Does this affect anything other than down/upstream_task_ids? If not, maybe we should just rename the attribute on BaseOperator to remove the `_` (removing the public property, of course), and add a special case on deserialisation to migrate those attributes instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r780893173



##########
File path: airflow/models/baseoperator.py
##########
@@ -1634,7 +1635,7 @@ def map(self, **kwargs) -> "MappedOperator":
 
 
 def _validate_kwarg_names_for_mapping(cls: Type[BaseOperator], func_name: str, value: Dict[str, Any]):
-    if isinstance(str, cls):
+    if isinstance(cls, str):

Review comment:
       Neither the old and new line seem to make sense? Should `cls` be annotated as `Union[Type[BaseOperator], str]` instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786197515



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -658,7 +694,8 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
             setattr(op, k, v)
 
         for k in op.get_serialized_fields() - encoded_op.keys() - cls._CONSTRUCTOR_PARAMS.keys():
-            setattr(op, k, None)
+            if not hasattr(op, k):

Review comment:
       ```suggestion
               # TODO: refactor deserializing and split BaseOperator and MappedOperaotr, then check could go away.
               if not hasattr(op, k):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786204849



##########
File path: airflow/models/baseoperator.py
##########
@@ -1706,25 +1701,49 @@ def _validate_kwarg_names_for_mapping(cls: Type[BaseOperator], func_name: str, v
 class MappedOperator(DAGNode):
     """Object representing a mapped operator in a DAG"""
 
-    operator_class: Type[BaseOperator] = attr.ib(repr=lambda c: c.__name__)
+    def __repr__(self) -> str:
+        return (
+            f'MappedOperator(task_type={self.task_type}, '
+            + f'task_id={self.task_id!r}, partial_kwargs={self.partial_kwargs!r}, '
+            + f'mapped_kwargs={self.mapped_kwargs!r}, dag={self.dag})'
+        )
+
+    operator_class: Union[Type[BaseOperator], str]
     task_type: str = attr.ib()
     task_id: str
     partial_kwargs: Dict[str, Any]
     mapped_kwargs: Dict[str, Any] = attr.ib(
         validator=lambda self, _, v: _validate_kwarg_names_for_mapping(self.operator_class, "map", v)
     )
     dag: Optional["DAG"] = None
-    upstream_task_ids: Set[str] = attr.ib(factory=set, repr=False)
-    downstream_task_ids: Set[str] = attr.ib(factory=set, repr=False)
-
-    task_group: Optional["TaskGroup"] = attr.ib(repr=False)
+    upstream_task_ids: Set[str] = attr.ib(factory=set)
+    downstream_task_ids: Set[str] = attr.ib(factory=set)
 
+    task_group: Optional["TaskGroup"] = attr.ib()
     # BaseOperator-like interface -- needed so we can add oursleves to the dag.tasks
-    start_date: Optional[pendulum.DateTime] = attr.ib(repr=False, default=None)
-    end_date: Optional[pendulum.DateTime] = attr.ib(repr=False, default=None)
+    start_date: Optional[pendulum.DateTime] = attr.ib(default=None)
+    end_date: Optional[pendulum.DateTime] = attr.ib(default=None)
     owner: str = attr.ib(repr=False, default=conf.get("operators", "DEFAULT_OWNER"))
     max_active_tis_per_dag: Optional[int] = attr.ib(default=None)
 
+    # Needed for SerializedBaseOperator
+    _is_dummy: bool = attr.ib()
+
+    deps: Iterable[BaseTIDep] = attr.ib()
+    operator_extra_links: Iterable['BaseOperatorLink'] = ()
+    params: Union[ParamsDict, dict] = attr.ib(factory=ParamsDict)
+    template_fields: Iterable[str] = attr.ib()
+
+    @_is_dummy.default
+    def _is_dummy_default(self):
+        from airflow.operators.dummy import DummyOperator
+
+        return issubclass(self.operator_class, DummyOperator)
+
+    @deps.default
+    def _deps_from_class(self):
+        return self.operator_class.deps

Review comment:
       I’m assuming many of these defaults (this, `_is_dummy`, and `template_fields`, I believe) don’t need to consider when `operator_class` is a str because in that case these values would’ve been supplied explicitly instead. (It’s kind of bad it’s designed this way but I guess that can be said for many things regarding the current serialisation implementation...)

##########
File path: airflow/models/baseoperator.py
##########
@@ -1706,25 +1701,49 @@ def _validate_kwarg_names_for_mapping(cls: Type[BaseOperator], func_name: str, v
 class MappedOperator(DAGNode):
     """Object representing a mapped operator in a DAG"""
 
-    operator_class: Type[BaseOperator] = attr.ib(repr=lambda c: c.__name__)
+    def __repr__(self) -> str:
+        return (
+            f'MappedOperator(task_type={self.task_type}, '
+            + f'task_id={self.task_id!r}, partial_kwargs={self.partial_kwargs!r}, '
+            + f'mapped_kwargs={self.mapped_kwargs!r}, dag={self.dag})'

Review comment:
       ```suggestion
               f'MappedOperator(task_type={self.task_type}, '
               f'task_id={self.task_id!r}, partial_kwargs={self.partial_kwargs!r}, '
               f'mapped_kwargs={self.mapped_kwargs!r}, dag={self.dag})'
   ```
   
   implicit string concatenation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786222539



##########
File path: airflow/models/baseoperator.py
##########
@@ -1706,25 +1701,49 @@ def _validate_kwarg_names_for_mapping(cls: Type[BaseOperator], func_name: str, v
 class MappedOperator(DAGNode):
     """Object representing a mapped operator in a DAG"""
 
-    operator_class: Type[BaseOperator] = attr.ib(repr=lambda c: c.__name__)
+    def __repr__(self) -> str:
+        return (
+            f'MappedOperator(task_type={self.task_type}, '
+            + f'task_id={self.task_id!r}, partial_kwargs={self.partial_kwargs!r}, '
+            + f'mapped_kwargs={self.mapped_kwargs!r}, dag={self.dag})'
+        )
+
+    operator_class: Union[Type[BaseOperator], str]
     task_type: str = attr.ib()
     task_id: str
     partial_kwargs: Dict[str, Any]
     mapped_kwargs: Dict[str, Any] = attr.ib(
         validator=lambda self, _, v: _validate_kwarg_names_for_mapping(self.operator_class, "map", v)
     )
     dag: Optional["DAG"] = None
-    upstream_task_ids: Set[str] = attr.ib(factory=set, repr=False)
-    downstream_task_ids: Set[str] = attr.ib(factory=set, repr=False)
-
-    task_group: Optional["TaskGroup"] = attr.ib(repr=False)
+    upstream_task_ids: Set[str] = attr.ib(factory=set)
+    downstream_task_ids: Set[str] = attr.ib(factory=set)
 
+    task_group: Optional["TaskGroup"] = attr.ib()
     # BaseOperator-like interface -- needed so we can add oursleves to the dag.tasks
-    start_date: Optional[pendulum.DateTime] = attr.ib(repr=False, default=None)
-    end_date: Optional[pendulum.DateTime] = attr.ib(repr=False, default=None)
+    start_date: Optional[pendulum.DateTime] = attr.ib(default=None)
+    end_date: Optional[pendulum.DateTime] = attr.ib(default=None)
     owner: str = attr.ib(repr=False, default=conf.get("operators", "DEFAULT_OWNER"))
     max_active_tis_per_dag: Optional[int] = attr.ib(default=None)
 
+    # Needed for SerializedBaseOperator
+    _is_dummy: bool = attr.ib()
+
+    deps: Iterable[BaseTIDep] = attr.ib()
+    operator_extra_links: Iterable['BaseOperatorLink'] = ()
+    params: Union[ParamsDict, dict] = attr.ib(factory=ParamsDict)
+    template_fields: Iterable[str] = attr.ib()
+
+    @_is_dummy.default
+    def _is_dummy_default(self):
+        from airflow.operators.dummy import DummyOperator
+
+        return issubclass(self.operator_class, DummyOperator)
+
+    @deps.default
+    def _deps_from_class(self):
+        return self.operator_class.deps

Review comment:
       Yes, exactly that. I've already started thinking in my head about how to refactor/rearchitect the serialization and deserialization.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #20743:
URL: https://github.com/apache/airflow/pull/20743


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r784801107



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -586,9 +604,24 @@ def serialize_operator(cls, op: BaseOperator) -> Dict[str, Any]:
         return serialize_op
 
     @classmethod
-    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
+    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> Union[BaseOperator, MappedOperator]:
         """Deserializes an operator from a JSON object."""
-        op = SerializedBaseOperator(task_id=encoded_op['task_id'])
+        op: Union[BaseOperator, MappedOperator]
+        # Check if it's a mapped operator
+        if "mapped_kwargs" in encoded_op:

Review comment:
       The following line does, no?
   
   https://github.com/apache/airflow/blob/3ef6268e0039f40a6380394516bdf4b587bf7a4d/airflow/serialization/serialized_objects.py#L550




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r784799071



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -586,9 +604,24 @@ def serialize_operator(cls, op: BaseOperator) -> Dict[str, Any]:
         return serialize_op
 
     @classmethod
-    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
+    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> Union[BaseOperator, MappedOperator]:
         """Deserializes an operator from a JSON object."""
-        op = SerializedBaseOperator(task_id=encoded_op['task_id'])
+        op: Union[BaseOperator, MappedOperator]
+        # Check if it's a mapped operator
+        if "mapped_kwargs" in encoded_op:
+            op = MappedOperator(
+                task_id=encoded_op['task_id'],
+                dag=None,
+                operator_class='.'.join(filter(None, (encoded_op['_task_module'], encoded_op['_task_type']))),
+                # These are all re-set later
+                partial_kwargs={},

Review comment:
       Shouldn't `partial_kwargs={}` be a default on the `MappedOperator` itself?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r781911926



##########
File path: airflow/models/baseoperator.py
##########
@@ -1634,7 +1635,7 @@ def map(self, **kwargs) -> "MappedOperator":
 
 
 def _validate_kwarg_names_for_mapping(cls: Type[BaseOperator], func_name: str, value: Dict[str, Any]):
-    if isinstance(str, cls):
+    if isinstance(cls, str):

Review comment:
       I pushed a commit to fix this (and a few other annotations)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r781159103



##########
File path: airflow/models/baseoperator.py
##########
@@ -1741,6 +1788,28 @@ def leaves(self) -> List["MappedOperator"]:
     def has_dag(self):
         return self.dag is not None
 
+    @property
+    def inherits_from_dummy_operator(self):
+        """Used to determine if an Operator is inherited from DummyOperator"""
+        # This looks like `isinstance(self, DummyOperator) would work, but this also
+        # needs to cope when `self` is a Serialized instance of a DummyOperator or one
+        # of its sub-classes (which don't inherit from anything but BaseOperator).
+        return getattr(self, '_is_dummy', False)

Review comment:
       Possibly can't be, I'll check.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r781158658



##########
File path: airflow/models/baseoperator.py
##########
@@ -207,6 +207,7 @@ def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any:
 
             result = func(self, **kwargs, default_args=default_args)
             # Store the args passed to init -- we need them to support task.map serialzation!
+            kwargs.pop('task_id', None)

Review comment:
       Delete the task_id from the kwargs (so it isn't stored in the mapped data structure) -- so `del kwargs['task_id']` but without the try/except for when it's not there? I'll double check if I can just do this with a `del`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#issuecomment-1013160601


   I'm now saying this is "good enough" -- we can make changes later if we want to


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r784798716



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -586,9 +604,24 @@ def serialize_operator(cls, op: BaseOperator) -> Dict[str, Any]:
         return serialize_op
 
     @classmethod
-    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
+    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> Union[BaseOperator, MappedOperator]:
         """Deserializes an operator from a JSON object."""
-        op = SerializedBaseOperator(task_id=encoded_op['task_id'])
+        op: Union[BaseOperator, MappedOperator]
+        # Check if it's a mapped operator
+        if "mapped_kwargs" in encoded_op:
+            op = MappedOperator(
+                task_id=encoded_op['task_id'],
+                dag=None,
+                operator_class='.'.join(filter(None, (encoded_op['_task_module'], encoded_op['_task_type']))),

Review comment:
       I honestly don't know -- Cos module and type are both required.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r784798915



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -586,9 +604,24 @@ def serialize_operator(cls, op: BaseOperator) -> Dict[str, Any]:
         return serialize_op
 
     @classmethod
-    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
+    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> Union[BaseOperator, MappedOperator]:
         """Deserializes an operator from a JSON object."""
-        op = SerializedBaseOperator(task_id=encoded_op['task_id'])
+        op: Union[BaseOperator, MappedOperator]
+        # Check if it's a mapped operator
+        if "mapped_kwargs" in encoded_op:

Review comment:
       We don't store _is_mapped currently.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#issuecomment-1015230910


   I’ll start working on attribute parity between BaseOperator and MappedOperator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#issuecomment-1015217644


   Looks good -- thanks for picking up those fiixes TP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786200271



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -653,10 +677,14 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
                 v = cls._deserialize(v)
             # else use v as it is
 
-            setattr(op, k, v)
+            if hasattr(op, k) and isinstance(v, set):
+                getattr(op, k).update(v)

Review comment:
       Ah no, can't remove that one yet. I've added a TODO comment to refactor and remove it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r784867350



##########
File path: airflow/models/baseoperator.py
##########
@@ -207,6 +207,7 @@ def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any:
 
             result = func(self, **kwargs, default_args=default_args)
             # Store the args passed to init -- we need them to support task.map serialzation!
+            kwargs.pop('task_id', None)

Review comment:
       Oh right I remember the problem now. Because we end up calling apply_defaults multiple times (once for each class in the MRO) task_id gets added in to kwargs for the `super().__init__(**kwargs)` call, so this del here is to make sure it doesn't get stored in partial kwargs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r784796827



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -586,9 +604,24 @@ def serialize_operator(cls, op: BaseOperator) -> Dict[str, Any]:
         return serialize_op
 
     @classmethod
-    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
+    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> Union[BaseOperator, MappedOperator]:
         """Deserializes an operator from a JSON object."""
-        op = SerializedBaseOperator(task_id=encoded_op['task_id'])
+        op: Union[BaseOperator, MappedOperator]
+        # Check if it's a mapped operator
+        if "mapped_kwargs" in encoded_op:

Review comment:
       Probably the following might be a better (explicit) condition ?
   ```suggestion
           if encoded_op.get("_is_mapped"):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r784800398



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -586,9 +604,24 @@ def serialize_operator(cls, op: BaseOperator) -> Dict[str, Any]:
         return serialize_op
 
     @classmethod
-    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
+    def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> Union[BaseOperator, MappedOperator]:
         """Deserializes an operator from a JSON object."""
-        op = SerializedBaseOperator(task_id=encoded_op['task_id'])
+        op: Union[BaseOperator, MappedOperator]
+        # Check if it's a mapped operator
+        if "mapped_kwargs" in encoded_op:
+            op = MappedOperator(
+                task_id=encoded_op['task_id'],
+                dag=None,
+                operator_class='.'.join(filter(None, (encoded_op['_task_module'], encoded_op['_task_type']))),
+                # These are all re-set later
+                partial_kwargs={},
+                mapped_kwargs={},
+                deps=tuple(),
+                is_dummy=False,
+                template_fields=(),

Review comment:
       Same for these three, shouldn't we have a default on `MappedOperator` itself?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r784879013



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -653,10 +677,14 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
                 v = cls._deserialize(v)
             # else use v as it is
 
-            setattr(op, k, v)
+            if hasattr(op, k) and isinstance(v, set):
+                getattr(op, k).update(v)

Review comment:
       I'f changed this slightly to:
   
   ```
   try:
     setattr(op, k, k)
   except AttributeError:
      # Read only value, update in place
     getattr(op, k).update(v)
   ```
   
   still a bit hacky, but marginally less so. I'll revisit this in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#issuecomment-1015152810


   Seems to be working. If the latest two commits look good to you @ashb this is ready to go in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786165855



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -653,10 +677,14 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
                 v = cls._deserialize(v)
             # else use v as it is
 
-            setattr(op, k, v)
+            if hasattr(op, k) and isinstance(v, set):
+                getattr(op, k).update(v)

Review comment:
       Does the change in `deserialize_operator` also need to be updated?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786143028



##########
File path: airflow/models/baseoperator.py
##########
@@ -207,6 +207,7 @@ def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any:
 
             result = func(self, **kwargs, default_args=default_args)
             # Store the args passed to init -- we need them to support task.map serialzation!
+            kwargs.pop('task_id', None)

Review comment:
       This is the other option:
   
   ```diff
   diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
   index 752b8a6cf..71170320d 100644
   --- a/airflow/models/baseoperator.py
   +++ b/airflow/models/baseoperator.py
   @@ -136,6 +136,7 @@ class BaseOperatorMeta(abc.ABCMeta):
            non_optional_args = {
                name for (name, param) in non_variadic_params.items() if param.default == param.empty
            }
   +        non_optional_args -= {'task_id'}
    
            class autostacklevel_warn:
                def __init__(self):
   @@ -158,7 +159,7 @@ class BaseOperatorMeta(abc.ABCMeta):
                func.__globals__['warnings'] = autostacklevel_warn()
    
            @functools.wraps(func)
   -        def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any:
   +        def apply_defaults(self: "BaseOperator", *args: Any, task_id: str, **kwargs: Any) -> Any:
                from airflow.models.dag import DagContext
                from airflow.utils.task_group import TaskGroupContext
    
   @@ -201,15 +202,15 @@ class BaseOperatorMeta(abc.ABCMeta):
    
                hook = getattr(self, '_hook_apply_defaults', None)
                if hook:
   -                args, kwargs = hook(**kwargs, default_args=default_args)
   +                args, kwargs = hook(task_id=task_id, **kwargs, default_args=default_args)
   +                task_id = kwargs.pop('task_id')
                    default_args = kwargs.pop('default_args', {})
    
                if not hasattr(self, '_BaseOperator__init_kwargs'):
                    self._BaseOperator__init_kwargs = {}
    
   -            result = func(self, **kwargs, default_args=default_args)
   +            result = func(self, **kwargs, task_id=task_id, default_args=default_args)
                # Store the args passed to init -- we need them to support task.map serialzation!
   -            kwargs.pop('task_id', None)
                self._BaseOperator__init_kwargs.update(kwargs)  # type: ignore
    
                # Here we set upstream task defined by XComArgs passed to template fields of the operator
   
   ```
   
   Which do you think is better @uranusjr?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786138557



##########
File path: airflow/models/baseoperator.py
##########
@@ -207,6 +207,7 @@ def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any:
 
             result = func(self, **kwargs, default_args=default_args)
             # Store the args passed to init -- we need them to support task.map serialzation!
+            kwargs.pop('task_id', None)

Review comment:
       This is dealing with it in partial_kwargs -- 
   
   ```
               # Store the args passed to init -- we need them to support task.map serialzation!
               kwargs.pop('task_id', None)
               self._BaseOperator__init_kwargs.update(kwargs)  # type: ignore
   ```
   
   (The only use of `__init_kwargs` is for building partial_kwargs when mapping a task)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786199495



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -658,7 +694,9 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
             setattr(op, k, v)
 
         for k in op.get_serialized_fields() - encoded_op.keys() - cls._CONSTRUCTOR_PARAMS.keys():
-            setattr(op, k, None)
+            # TODO: refactor deserialization of BaseOperator and MappedOperaotr (split it out), then check could go away.

Review comment:
       ```suggestion
                # TODO: refactor deserialization of BaseOperator and MappedOperaotr (split it out), then check
                # could go away.
   ```

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -658,7 +694,9 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
             setattr(op, k, v)
 
         for k in op.get_serialized_fields() - encoded_op.keys() - cls._CONSTRUCTOR_PARAMS.keys():
-            setattr(op, k, None)
+            # TODO: refactor deserialization of BaseOperator and MappedOperaotr (split it out), then check could go away.

Review comment:
       ```suggestion
               # TODO: refactor deserialization of BaseOperator and MappedOperaotr (split it out), then check
               # could go away.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786197515



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -658,7 +694,8 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
             setattr(op, k, v)
 
         for k in op.get_serialized_fields() - encoded_op.keys() - cls._CONSTRUCTOR_PARAMS.keys():
-            setattr(op, k, None)
+            if not hasattr(op, k):

Review comment:
       ```suggestion
               # TODO: refactor deserialization of BaseOperator and MappedOperaotr (split it out), then check could go away.
               if not hasattr(op, k):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r781161629



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -653,10 +677,14 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
                 v = cls._deserialize(v)
             # else use v as it is
 
-            setattr(op, k, v)
+            if hasattr(op, k) and isinstance(v, set):
+                getattr(op, k).update(v)

Review comment:
       ```python
       @property
       def upstream_task_ids(self) -> Set[str]:
           """@property: set of ids of tasks directly upstream"""
           return self._upstream_task_ids
   
       @property
       def downstream_task_ids(self) -> Set[str]:
           """@property: set of ids of tasks directly downstream"""
           return self._downstream_task_ids
   ```
   
   Those can't be written but to the value of the set is mutable. So it's related to the `downstream_task_ids` vs `_downstream_task_ids` -- I think the change might be to make `downstream_task_id` a normal set attribute on BaseOperator (rather than have `_downstream_task_ids` on MappedOperator) -- I don't like storing/accessing the "private" fields here.
   
   I'll see if that can be done without breaking the old serialization format.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20743: Serialize mapped tasks and task groups

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r781158658



##########
File path: airflow/models/baseoperator.py
##########
@@ -207,6 +207,7 @@ def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) -> Any:
 
             result = func(self, **kwargs, default_args=default_args)
             # Store the args passed to init -- we need them to support task.map serialzation!
+            kwargs.pop('task_id', None)

Review comment:
       Delete the task_id form the kwargs (so it isn't stored in the mapped data structure) -- so `del kwargs['task_id']` but without the try/except for when it's not there? I'll double check if I can just do this with a `del`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org