You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/09 22:52:12 UTC

[airflow] 14/39: Validate type of `priority_weight` during parsing (#16765)

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4678cf54b5cd651e0232a42746f9be80db43a609
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Jul 2 01:52:50 2021 +0100

    Validate type of `priority_weight` during parsing (#16765)
    
    closes https://github.com/apache/airflow/issues/16762
    
    Without this the scheduler crashes as validation does not happen at DAG Parsing time.
    
    (cherry picked from commit 9d170279a60d9d4ed513bae1c35999926f042662)
---
 airflow/models/baseoperator.py    | 7 ++++++-
 tests/models/test_baseoperator.py | 5 +++++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 10e8bfd..1fec8cf 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -586,10 +586,15 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
             if isinstance(max_retry_delay, timedelta):
                 self.max_retry_delay = max_retry_delay
             else:
-                self.log.debug("Max_retry_delay isn't timedelta object, assuming secs")
+                self.log.debug("max_retry_delay isn't a timedelta object, assuming secs")
                 self.max_retry_delay = timedelta(seconds=max_retry_delay)
 
         self.params = params or {}  # Available in templates!
+        if priority_weight is not None and not isinstance(priority_weight, int):
+            raise AirflowException(
+                f"`priority_weight` for task '{self.task_id}' only accepts integers, "
+                f"received '{type(priority_weight)}'."
+            )
         self.priority_weight = priority_weight
         if not WeightRule.is_valid(weight_rule):
             raise AirflowException(
diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py
index fa02b4e..04d3f54 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -109,6 +109,11 @@ class TestBaseOperator(unittest.TestCase):
         with pytest.raises(AirflowException, match='Argument.*test_param.*required'):
             DummyClass(default_args=default_args)
 
+    def test_incorrect_priority_weight(self):
+        error_msg = "`priority_weight` for task 'test_op' only accepts integers, received '<class 'str'>'."
+        with pytest.raises(AirflowException, match=error_msg):
+            DummyOperator(task_id="test_op", priority_weight="2")
+
     @parameterized.expand(
         [
             ("{{ foo }}", {"foo": "bar"}, "bar"),