You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/07/06 10:45:16 UTC

[airflow] branch v2-3-test updated: Fix exception in mini task scheduler. (#24865)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-3-test by this push:
     new 7bbb14a12c Fix exception in mini task scheduler. (#24865)
7bbb14a12c is described below

commit 7bbb14a12c45ea727596d48cd27b7239aadcefbf
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Wed Jul 6 11:34:48 2022 +0100

    Fix exception in mini task scheduler. (#24865)
    
    I introduced a bug in 2.3.0 as part of the dynamic task mapping work
    that frequently made the mini scheduler fail for tasks involving
    XComArgs.
    
    The fix is to alter the logic in BaseOperator's deepcopy to not set the
    `__instantiated` flag until all the other attributes are copied.
    
    For background the `__instantiated` flag is use so that when you do
    `task.some_attr = an_xcom_arg` the relationships are set appropriately,
    but since we are copying all the existing attributes we don't need to do
    that, as the relationships will already be set!
    
    (cherry picked from commit c23b31cd786760da8a8e39ecbcf2c0d31e50e594)
---
 airflow/models/baseoperator.py    | 11 +++++++----
 tests/models/test_baseoperator.py | 13 +++++++++++++
 2 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 2fea760716..30990880dc 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1140,11 +1140,10 @@ class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta):
         """
 
     def __deepcopy__(self, memo):
-        """
-        Hack sorting double chained task lists by task_id to avoid hitting
-        max_depth on deepcopy operations.
-        """
+        # Hack sorting double chained task lists by task_id to avoid hitting
+        # max_depth on deepcopy operations.
         sys.setrecursionlimit(5000)  # TODO fix this in a better way
+
         cls = self.__class__
         result = cls.__new__(cls)
         memo[id(self)] = result
@@ -1152,10 +1151,14 @@ class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta):
         shallow_copy = cls.shallow_copy_attrs + cls._base_operator_shallow_copy_attrs
 
         for k, v in self.__dict__.items():
+            if k == "_BaseOperator__instantiated":
+                # Don't set this until the _end_, as it changes behaviour of __setattr__
+                continue
             if k not in shallow_copy:
                 setattr(result, k, copy.deepcopy(v, memo))
             else:
                 setattr(result, k, copy.copy(v))
+        result.__instantiated = self.__instantiated
         return result
 
     def __getstate__(self):
diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py
index 8cb8d96e81..40c228c961 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -15,6 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import copy
 import logging
 import uuid
 from datetime import date, datetime, timedelta
@@ -994,3 +995,15 @@ def test_mapped_render_template_fields_validating_operator(dag_maker, session):
     assert op.value == "{{ ds }}", "Should not be templated!"
     assert op.arg1 == "{{ ds }}"
     assert op.arg2 == "a"
+
+
+def test_deepcopy():
+    # Test bug when copying an operator attached to a DAG
+    with DAG("dag0", start_date=DEFAULT_DATE) as dag:
+
+        @dag.task
+        def task0():
+            pass
+
+        MockOperator(task_id="task1", arg1=task0())
+    copy.deepcopy(dag)