You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/10/18 13:10:38 UTC

[airflow] 22/41: Fix warning when using xcomarg dependencies (#26801)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 56f1e473e7c8b9b2fbb2c0f72c6b06596bdb07e3
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Fri Sep 30 17:03:07 2022 +0100

    Fix warning when using xcomarg dependencies (#26801)
    
    This warning was invisible before 2.4 due to a bug in our logging config
    (fixed by commit 7363e35) and AIP-45 which suddenly made this appear.
    
    The problem was being caused by set_xcomargs_dependencies being called
    once for each class in the hierarchy, and each of them doing the same
    logic.
    
    The fix is to look at the _actual_ function of `self.__init__` and
    compare it to the function we're about to call so that we don't set
    dependencies until we have finished the "outer" most class's
    apply_defaults invocation.
    
    (cherry picked from commit d77f0563b403ae9e1a92e8e9e998a1142bb6f359)
---
 airflow/models/baseoperator.py  |  5 +++--
 tests/decorators/test_python.py | 15 +++++++++++++++
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 3a95a3945c..8bc96df735 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -410,8 +410,9 @@ class BaseOperatorMeta(abc.ABCMeta):
             # Store the args passed to init -- we need them to support task.map serialzation!
             self._BaseOperator__init_kwargs.update(kwargs)  # type: ignore
 
-            if not instantiated_from_mapped:
-                # Set upstream task defined by XComArgs passed to template fields of the operator.
+            # Set upstream task defined by XComArgs passed to template fields of the operator.
+            # BUT: only do this _ONCE_, not once for each class in the hierarchy
+            if not instantiated_from_mapped and func == self.__init__.__wrapped__:  # type: ignore[misc]
                 self.set_xcomargs_dependencies()
                 # Mark instance as instantiated.
                 self._BaseOperator__instantiated = True
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
index 3dc1516682..876c4b23d4 100644
--- a/tests/decorators/test_python.py
+++ b/tests/decorators/test_python.py
@@ -817,3 +817,18 @@ def test_upstream_exception_produces_none_xcom(dag_maker, session):
     assert len(decision.schedulable_tis) == 1  # "down"
     decision.schedulable_tis[0].run(session=session)
     assert result == "'example' None"
+
+
+@pytest.mark.filterwarnings("error")
+def test_no_warnings(reset_logging_config, caplog):
+    @task_decorator
+    def some_task():
+        return 1
+
+    @task_decorator
+    def other(x):
+        ...
+
+    with DAG(dag_id='test', start_date=DEFAULT_DATE, schedule=None):
+        other(some_task())
+    assert caplog.messages == []