You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/09 22:52:23 UTC
[airflow] 25/39: Core: Enable the use of __init_subclass__ in
subclasses of BaseOperator (#17027)
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 120cbb9072da993e3f51350097a255ac7e15b2c4
Author: Henry Zhang <me...@henry.dev>
AuthorDate: Thu Jul 22 15:23:51 2021 -0700
Core: Enable the use of __init_subclass__ in subclasses of BaseOperator (#17027)
This fixes a regression in 2.1 where subclasses of BaseOperator could no
longer use `__init_subclass__` to allow class instantiation time
customization.
Related BPO: https://bugs.python.org/issue29581
Fixes: https://github.com/apache/airflow/issues/17014
(cherry picked from commit 901513203f287d4f8152f028e9070a2dec73ad74)
---
airflow/models/baseoperator.py | 4 ++--
tests/models/test_baseoperator.py | 26 ++++++++++++++++++++++++++
2 files changed, 28 insertions(+), 2 deletions(-)
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 1fec8cf..360f264 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -186,8 +186,8 @@ class BaseOperatorMeta(abc.ABCMeta):
return cast(T, apply_defaults)
- def __new__(cls, name, bases, namespace):
- new_cls = super().__new__(cls, name, bases, namespace)
+ def __new__(cls, name, bases, namespace, **kwargs):
+ new_cls = super().__new__(cls, name, bases, namespace, **kwargs)
new_cls.__init__ = cls._apply_defaults(new_cls.__init__)
return new_cls
diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py
index 04d3f54..87a9247 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -516,3 +516,29 @@ class TestXComArgsRelationsAreResolved:
with pytest.raises(AirflowException):
op1 = DummyOperator(task_id="op1")
CustomOp(task_id="op2", field=op1.output)
+
+
+class InitSubclassOp(DummyOperator):
+ def __init_subclass__(cls, class_arg=None, **kwargs) -> None:
+ cls._class_arg = class_arg
+ super().__init_subclass__(**kwargs)
+
+ def execute(self, context):
+ self.context_arg = context
+
+
+class TestInitSubclassOperator:
+ def test_init_subclass_args(self):
+ class_arg = "foo"
+ context = {"key": "value"}
+
+ class ConcreteSubclassOp(InitSubclassOp, class_arg=class_arg):
+ pass
+
+ task = ConcreteSubclassOp(task_id="op1")
+ task_copy = task.prepare_for_execution()
+
+ task_copy.execute(context)
+
+ assert task_copy._class_arg == class_arg
+ assert task_copy.context_arg == context