You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/03/07 16:16:24 UTC

[airflow] 17/23: Add a check for not templateable fields (#29821)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5add348e59c5ada75a3d73976090d4a6b6d43e6e
Author: Hussein Awala <ho...@gmail.com>
AuthorDate: Sat Mar 4 19:19:09 2023 +0100

    Add a check for not templateable fields (#29821)
    
    (cherry picked from commit 7963360b8d43a15791a6b7d4335f482fce1d82d2)
---
 airflow/serialization/serialized_objects.py   |  6 ++++++
 tests/serialization/test_dag_serialization.py | 18 +++++++++++++++++-
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 6cb33cd203..e3bc785701 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 import collections.abc
 import datetime
 import enum
+import inspect
 import logging
 import warnings
 import weakref
@@ -701,6 +702,7 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
     def task_type(self) -> str:
         # Overwrites task_type of BaseOperator to use _task_type instead of
         # __class__.__name__.
+
         return self._task_type
 
     @task_type.setter
@@ -769,8 +771,12 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
 
         # Store all template_fields as they are if there are JSON Serializable
         # If not, store them as strings
+        # And raise an exception if the field is not templateable
+        forbidden_fields = set(inspect.signature(BaseOperator.__init__).parameters.keys())
         if op.template_fields:
             for template_field in op.template_fields:
+                if template_field in forbidden_fields:
+                    raise AirflowException(f"Cannot template BaseOperator fields: {template_field}")
                 value = getattr(op, template_field, None)
                 if not cls._is_excluded(value, template_field, op):
                     serialize_op[template_field] = serialize_template_field(value)
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index ec07d60954..305efd2ea0 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -38,7 +38,7 @@ from kubernetes.client import models as k8s
 
 import airflow
 from airflow.datasets import Dataset
-from airflow.exceptions import SerializationError
+from airflow.exceptions import AirflowException, SerializationError
 from airflow.hooks.base import BaseHook
 from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.models import DAG, Connection, DagBag, Operator
@@ -1867,6 +1867,22 @@ class TestStringifiedDAGs:
         assert param.description == "hello"
         assert param.schema == {"type": "string"}
 
+    def test_not_templateable_fields_in_serialized_dag(
+        self,
+    ):
+        """
+        Test that when we use  not templateable fields, an Airflow exception is raised.
+        """
+
+        class TestOperator(BaseOperator):
+            template_fields = ("execution_timeout",)
+
+        dag = DAG("test_not_templateable_fields", start_date=datetime(2019, 8, 1))
+        with dag:
+            TestOperator(task_id="test", execution_timeout=timedelta(seconds=10))
+        with pytest.raises(AirflowException, match="Cannot template BaseOperator fields: execution_timeout"):
+            SerializedDAG.to_dict(dag)
+
 
 def test_kubernetes_optional():
     """Serialisation / deserialisation continues to work without kubernetes installed"""