You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/31 16:34:28 UTC

[GitHub] [airflow] cansjt opened a new issue, #25431: Operator deepcopy does not honor `shallow_copy_attrs` if value also referenced in `__init_kwargs`

cansjt opened a new issue, #25431:
URL: https://github.com/apache/airflow/issues/25431

   ### Apache Airflow version
   
   2.3.3 (latest released)
   
   ### What happened
   
   When implementing a custom operator I stumbled on the following issue:
   - There is a way to indicate that an attibute of operator should be shallow-copied, instead of deepcopied (by adding its name to the `<OperatorClass>.shallow_copy_attrs` class attribute);
   - Most operator attributes see their value set in the operator's `__init__()` method;
   - The arguments given to an operator's `__init__()` method are captured in the instances [`_BaseOperator__init_kwargs`](https://github.com/apache/airflow/blob/main/airflow/models/baseoperator.py#L680) attribute;
   - When an operator instance is deepcopied, the `_BaseOperator__init_kwargs` is also copied;
   - If the value of an attribute was given to the `__init__()` then it is deepcopied anyway, when copying the content of the `_BaseOperator__init_kwargs` dictionary, despite being specified as that it should not be deepcopied;
   
   Note that there is an additional difficulty to this problem: the names of the `kwargs` do not necessarily match the name of the instance attribute. It can be easily worked around, by adding both name to the class' `shallow_copy_attrs` list. That things a bit redundant, though.
   
   Not sure what the `_BaseOperator__init_kwargs` is used for, but if one can deepcopy an operator, I cannot help but wonder why it is needed?
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   I discovered the issue because, in a custom operator I was passing a object (from a third party package) that has a mis-implemented `__getattr__()` and it was getting copied anyways, having my DAG fall in an infinite recursion when attempting to copy it.
   
   Note that for brevity I also took a little shortcut: in the real case, the faulty instance is not directly attached to the operator instance but an attribute on another object that is itself attached to the operator. Which could make the use of the `shallow_copy_attrs` effective where it is here ineffective in the first example. One can consider that the adapter class, in the examples below, takes the role of the intermediate object. Still the intermediate being deepcopied, when it shouldn't, make set `shallow_copy_attrs` ineffective.
   
   This first example, shows the initial situation:
   ```python
   import copy
   
   from airflow.operators.python import PythonOperator
   
   
   class ThirdPartyClassWithMisbehavedGetAttr:
   
       def __getattr__(self, item):
           if attr := getattr(self, f'_{item}'):
               return attr
           raise AttributeError
   
   
   class CustomOperator(PythonOperator):
   
       shallow_copy_attrs = ('_misbehavedparam', 'misbehavedparam')
   
       def __init__(self,
                    *,
                    misbehavedparam: None | ThirdPartyClassWithMisbehavedGetAttr = None,
                    **kwargs):
           super().__init__(**kwargs)
           self._misbehavedparam = misbehavedparam or ThirdPartyClassWithMisbehavedGetAttr()
   
       def __deepcopy__(self, *args):  # Only here to intercept the call and allow debuging
           breakpoint()
           super().__deepcopy__(*args)
   
   
   def f(**kwargs):
       print('here')
   
   
   operator1 = CustomOperator(python_callable=f, task_id='doit')
   print(operator1._BaseOperator__init_kwargs)
   result = copy.deepcopy(operator1)  # Infinite recursion loop.
   ```
   Running the code above fails with the exception:
   ```
   Traceback (most recent call last):
     File "/home/nicolas/src/gitco/jobteaser/aggregator/mre.py", line 11, in __getattr__
       if attr := getattr(self, f'_{item}'):
     File "/home/nicolas/src/gitco/jobteaser/aggregator/mre.py", line 11, in __getattr__
       if attr := getattr(self, f'_{item}'):
     File "/home/nicolas/src/gitco/jobteaser/aggregator/mre.py", line 11, in __getattr__
       if attr := getattr(self, f'_{item}'):
     [Previous line repeated 997 more times]
   RecursionError: maximum recursion depth exceeded
   ``` 
   
   Here is a way to work around the problem. Assuming the adapter class below can somehow reconstruct the faulty instance. In my case it is possible (not showed here for brevity):
   ```python
   class CustomOperatorWithWorkAround(PythonOperator):
   
       def __init__(self,
                    *,
                    misbehavedparam: None | ThirdPartyClassWithMisbehavedGetAttr = None,
                    **kwargs):
           super().__init__(**kwargs)
           self._misbehavedparam = MisbehavedAdapter(misbehavedparam
                                                     or ThirdPartyClassWithMisbehavedGetAttr())
   
       def __deepcopy__(self, *args):
           breakpoint()
           super().__deepcopy__(*args)
   
   
   class MisbehavedAdapter:
   
       def __init__(self, adaptee):
           self._adaptee = adaptee
   
       @classmethod
       def copy(cls) -> 'MisbehavedAdapter':
           # Reconstruct the fault instance somehow
           return MisbehavedAdapter(ThirdPartyClassWithMisbehavedGetAttr())
   
       def __getattr__(self, item):
           return getattr(self._adaptee, item)
   
       def __reduce_ex__(self, version):
           return (MisbehavedAdapter.copy, tuple())
   
   
   operator2 = CustomOperatorWithWorkAround(  # misbehavedparam=ThirdPartyClassWithMisbehavedGetAttr(),
                                            python_callable=f,
                                            task_id='doit',
                                            )
   print(operator2._BaseOperator__init_kwargs)
   result = copy.deepcopy(operator2)
   ```
   It's a bit of work but until the third party library is fixed, that works for me.
   
   Now if I uncomment the `fdupparam` kwarg in the above example:
   ```python
   operator3 = CustomOperatorWithWorkAround(misbehavedparam=ThirdPartyClassWithMisbehavedGetAttr(),
                                            python_callable=f,
                                            task_id='doit',
                                            )
   print(operator3._BaseOperator__init_kwargs)
   result = copy.deepcopy(operator3)
   ```
   The infinite recursion is back again, for the reasons exposed above (copy of the `_BaseOperator__init_kwargs` attribute, which has captured a reference to the faulty instance)
   
   So for now, to work around the problem, I have to set the instance attribute outside of the constructor, I added a setter (:cry:) to wrap it with the adapter:
   ```python
   class CustomOperatorWithWorkAround(PythonOperator):
   
       @property
       def misbehavedparam(self):
           return self._fdupparam
   
       @misbehavedparam.setter
       def misbehaved(self, value):
           self._fdupparam = MisbehavedAdapter(value)
   
       def __deepcopy__(self, *args):
           # breakpoint()
           return super().__deepcopy__(*args)
   
   
   operator4 = CustomOperatorWithWorkAround(python_callable=f,
                                            task_id='doit',
                                            )
   operator4.fdupparam = ThirdPartyClassWithFdUpGetAttr()
   print(operator4._BaseOperator__init_kwargs)
   result = copy.deepcopy(operator4)
   print(result)
   operator2 = CustomOperatorWithWorkAround(fdupparam=ThirdPartyClassWithFdUpGetAttr(),
                                            python_callable=f,
                                            task_id='doit',
                                            )
   ```
   
   
   ### Operating System
   
   Debian
   
   ### Versions of Apache Airflow Providers
   
   not relevant.
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   not relevant
   
   ### Anything else
   
   We could make a special case of the copy of that attribute. There is already one for the copy of the [`_BaseOperator__instantiated`](https://github.com/apache/airflow/blob/main/airflow/models/baseoperator.py#L1168) one. Remains the question of how do we want to handle that special case?
   - Have users list kwargs as part of the `shallow_copy_attrs` list? I find it is exposing and burdening users with implementation details they should not care about.
   - Do something a bit more complex, but also probably slower? e.g. I was thinking of doing something like:
   ```diff
   diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
   index 2795a0f53..c312beb1d 100644
   --- a/airflow/models/baseoperator.py
   +++ b/airflow/models/baseoperator.py
   @@ -1164,14 +1164,26 @@ class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta):
    
            shallow_copy = cls.shallow_copy_attrs + cls._base_operator_shallow_copy_attrs
    
   +        shallow_map = {}
            for k, v in self.__dict__.items():
   -            if k == "_BaseOperator__instantiated":
   +            if k in ("_BaseOperator__instantiated", "_BaseOperator__init_kwargs"):
                    # Don't set this until the _end_, as it changes behaviour of __setattr__
                    continue
                if k not in shallow_copy:
                    setattr(result, k, copy.deepcopy(v, memo))
                else:
                    setattr(result, k, copy.copy(v))
   +                shallow_map = {id(v): getattr(result, k)}
   +        result.__init_kwargs = init_kwargs = {}
   +        for k, v in self.__dict__["_BaseOperator__init_kwargs"]:
   +            id_ = id(v)
   +            if id_ in shallow_map:
   +                init_kwargs[k] = shallow_map[id_]
   +            elif id_ in memo:
   +                init_kwargs[k] = memo[id_]
   +            else:
   +                init_kwargs[k] = copy.deepcopy(v, memo)
   +
            result.__instantiated = self.__instantiated
            return result
    
   ```
   But then, not copying the `_BaseOperator__init_kwargs` first, breaks `__setattr__()`. I find that weird because that is kind of assuming that the name of the formal parameter is also the name of an attribute, and if so that there necessarily are related (which is likely, but nothing actually make sure it is, so to me it seems more like a broken assumption than anything).
   
   We can get a working version by moving up the initialization of the `__init_kwargs` attribute:
   ```diff
   diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
   index 2795a0f53..6507dd6c1 100644
   --- a/airflow/models/baseoperator.py
   +++ b/airflow/models/baseoperator.py
   @@ -1164,14 +1164,26 @@ class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta):
    
            shallow_copy = cls.shallow_copy_attrs + cls._base_operator_shallow_copy_attrs
    
   +        shallow_map = {}
   +        result.__init_kwargs = init_kwargs = {}
            for k, v in self.__dict__.items():
   -            if k == "_BaseOperator__instantiated":
   +            if k in ("_BaseOperator__instantiated", "_BaseOperator__init_kwargs"):
                    # Don't set this until the _end_, as it changes behaviour of __setattr__
                    continue
                if k not in shallow_copy:
                    setattr(result, k, copy.deepcopy(v, memo))
                else:
                    setattr(result, k, copy.copy(v))
   +                shallow_map = {id(v): getattr(result, k)}
   +        for k, v in self.__dict__["_BaseOperator__init_kwargs"]:
   +            id_ = id(v)
   +            if id_ in shallow_map:
   +                init_kwargs[k] = shallow_map[id_]
   +            elif id_ in memo:
   +                init_kwargs[k] = memo[id_]
   +            else:
   +                init_kwargs[k] = copy.deepcopy(v, memo)
   +
            result.__instantiated = self.__instantiated
            return result
    
   ```
   Shall we use the `memo` dict instead of the additional `shallow_map` dict? Could prevent the same issue to happen further down the line, if the same value is somehow referenced elsewhere. 
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #25431: Operator deepcopy does not honor `shallow_copy_attrs` if value also referenced in `__init_kwargs`

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on issue #25431:
URL: https://github.com/apache/airflow/issues/25431#issuecomment-1490842273

   Yes. If you can refer to that one and have a super-easy reproducible path, creating a new issue and marking it as "Related to:" is a good idea. We can then close that one as duplicate.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] Operator deepcopy does not honor `shallow_copy_attrs` if value also referenced in `__init_kwargs` [airflow]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #25431:
URL: https://github.com/apache/airflow/issues/25431#issuecomment-2033707160

   This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boushphong commented on issue #25431: Operator deepcopy does not honor `shallow_copy_attrs` if value also referenced in `__init_kwargs`

Posted by "boushphong (via GitHub)" <gi...@apache.org>.
boushphong commented on issue #25431:
URL: https://github.com/apache/airflow/issues/25431#issuecomment-1445053123

   I'm facing the same problem in 2.5.0. Can I work on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on issue #25431: Operator deepcopy does not honor `shallow_copy_attrs` if value also referenced in `__init_kwargs`

Posted by GitBox <gi...@apache.org>.
eladkal commented on issue #25431:
URL: https://github.com/apache/airflow/issues/25431#issuecomment-1260535290

   @cansjt do you plan to open a PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] cansjt commented on issue #25431: Operator deepcopy does not honor `shallow_copy_attrs` if value also referenced in `__init_kwargs`

Posted by GitBox <gi...@apache.org>.
cansjt commented on issue #25431:
URL: https://github.com/apache/airflow/issues/25431#issuecomment-1200842185

   Greping through the code on the `main` branch, I cannot find any place where the `__init_kwargs` attribute is actually ever read. It is set in different places, but not read:
   ```
   $ git grep _init_kwargs
   airflow/models/baseoperator.py:406:            if not hasattr(self, '_BaseOperator__init_kwargs'):
   airflow/models/baseoperator.py:407:                self._BaseOperator__init_kwargs = {}
   airflow/models/baseoperator.py:413:            self._BaseOperator__init_kwargs.update(kwargs)  # type: ignore
   airflow/models/baseoperator.py:680:    __init_kwargs: Dict[str, Any]
   airflow/models/baseoperator.py:755:        self.__init_kwargs = {}
   airflow/models/baseoperator.py:1007:        if key in self.__init_kwargs:
   airflow/models/baseoperator.py:1008:            self.__init_kwargs[key] = value
   airflow/models/baseoperator.py:1168:        result.__init_kwargs = init_kwargs = {}
   airflow/models/baseoperator.py:1170:            if k in ("_BaseOperator__instantiated", "_BaseOperator__init_kwargs"):
   airflow/models/baseoperator.py:1178:        for k, v in self.__dict__["_BaseOperator__init_kwargs"]:
   airflow/models/baseoperator.py:1466:                    '_BaseOperator__init_kwargs',
   ```
   What am I missing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #25431: Operator deepcopy does not honor `shallow_copy_attrs` if value also referenced in `__init_kwargs`

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #25431:
URL: https://github.com/apache/airflow/issues/25431#issuecomment-1203072620

   I think if you want to propose something, it's better to open PR and discuss it there adding your explanation over the code. This will make it far more productive discussion than trying to wrap the head around copy & pasted code from various places. This will take a long time for anyone looking at it to spend their energy on, and making a draft PR with what you proposed to do is far better IMHO. I have now big knowledge about this part, but I kinda dread having to take a look and try to understand what you want to do and why, becaus of all the copy &pasted code. 
   
   Just sayin;


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on issue #25431: Operator deepcopy does not honor `shallow_copy_attrs` if value also referenced in `__init_kwargs`

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on issue #25431:
URL: https://github.com/apache/airflow/issues/25431#issuecomment-1445054645

   sure. assigned 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boushphong commented on issue #25431: Operator deepcopy does not honor `shallow_copy_attrs` if value also referenced in `__init_kwargs`

Posted by "boushphong (via GitHub)" <gi...@apache.org>.
boushphong commented on issue #25431:
URL: https://github.com/apache/airflow/issues/25431#issuecomment-1483796116

   I have noticed that this issue also leads to a slightly more severe problems when Apache Airflow is deployed on Kubernetes. The problem seems to always result in the first pod of a task being caught in the Error state while the task still finishes. I've been debugging this, but to no avail.
   Shall I raise a new issue? perhaps a better description would make it easier for someone to pick this up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org