You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/10/18 13:10:38 UTC
[airflow] 22/41: Fix warning when using xcomarg dependencies (#26801)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 56f1e473e7c8b9b2fbb2c0f72c6b06596bdb07e3
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Fri Sep 30 17:03:07 2022 +0100
Fix warning when using xcomarg dependencies (#26801)
This warning was invisible before 2.4 due to a bug in our logging config
(fixed by commit 7363e35) and AIP-45 which suddenly made this appear.
The problem was being caused by set_xcomargs_dependencies being called
once for each class in the hierarchy, and each of them doing the same
logic.
The fix is to look at the _actual_ function of `self.__init__` and
compare it to the function we're about to call so that we don't set
dependencies until we have finished the "outer" most class's
apply_defaults invocation.
(cherry picked from commit d77f0563b403ae9e1a92e8e9e998a1142bb6f359)
---
airflow/models/baseoperator.py | 5 +++--
tests/decorators/test_python.py | 15 +++++++++++++++
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 3a95a3945c..8bc96df735 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -410,8 +410,9 @@ class BaseOperatorMeta(abc.ABCMeta):
# Store the args passed to init -- we need them to support task.map serialzation!
self._BaseOperator__init_kwargs.update(kwargs) # type: ignore
- if not instantiated_from_mapped:
- # Set upstream task defined by XComArgs passed to template fields of the operator.
+ # Set upstream task defined by XComArgs passed to template fields of the operator.
+ # BUT: only do this _ONCE_, not once for each class in the hierarchy
+ if not instantiated_from_mapped and func == self.__init__.__wrapped__: # type: ignore[misc]
self.set_xcomargs_dependencies()
# Mark instance as instantiated.
self._BaseOperator__instantiated = True
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
index 3dc1516682..876c4b23d4 100644
--- a/tests/decorators/test_python.py
+++ b/tests/decorators/test_python.py
@@ -817,3 +817,18 @@ def test_upstream_exception_produces_none_xcom(dag_maker, session):
assert len(decision.schedulable_tis) == 1 # "down"
decision.schedulable_tis[0].run(session=session)
assert result == "'example' None"
+
+
+@pytest.mark.filterwarnings("error")
+def test_no_warnings(reset_logging_config, caplog):
+ @task_decorator
+ def some_task():
+ return 1
+
+ @task_decorator
+ def other(x):
+ ...
+
+ with DAG(dag_id='test', start_date=DEFAULT_DATE, schedule=None):
+ other(some_task())
+ assert caplog.messages == []