You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/09 22:52:12 UTC
[airflow] 14/39: Validate type of `priority_weight` during parsing
(#16765)
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4678cf54b5cd651e0232a42746f9be80db43a609
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Jul 2 01:52:50 2021 +0100
Validate type of `priority_weight` during parsing (#16765)
closes https://github.com/apache/airflow/issues/16762
Without this the scheduler crashes as validation does not happen at DAG Parsing time.
(cherry picked from commit 9d170279a60d9d4ed513bae1c35999926f042662)
---
airflow/models/baseoperator.py | 7 ++++++-
tests/models/test_baseoperator.py | 5 +++++
2 files changed, 11 insertions(+), 1 deletion(-)
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 10e8bfd..1fec8cf 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -586,10 +586,15 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
if isinstance(max_retry_delay, timedelta):
self.max_retry_delay = max_retry_delay
else:
- self.log.debug("Max_retry_delay isn't timedelta object, assuming secs")
+ self.log.debug("max_retry_delay isn't a timedelta object, assuming secs")
self.max_retry_delay = timedelta(seconds=max_retry_delay)
self.params = params or {} # Available in templates!
+ if priority_weight is not None and not isinstance(priority_weight, int):
+ raise AirflowException(
+ f"`priority_weight` for task '{self.task_id}' only accepts integers, "
+ f"received '{type(priority_weight)}'."
+ )
self.priority_weight = priority_weight
if not WeightRule.is_valid(weight_rule):
raise AirflowException(
diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py
index fa02b4e..04d3f54 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -109,6 +109,11 @@ class TestBaseOperator(unittest.TestCase):
with pytest.raises(AirflowException, match='Argument.*test_param.*required'):
DummyClass(default_args=default_args)
+ def test_incorrect_priority_weight(self):
+ error_msg = "`priority_weight` for task 'test_op' only accepts integers, received '<class 'str'>'."
+ with pytest.raises(AirflowException, match=error_msg):
+ DummyOperator(task_id="test_op", priority_weight="2")
+
@parameterized.expand(
[
("{{ foo }}", {"foo": "bar"}, "bar"),