You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/03/04 18:19:19 UTC

[airflow] branch main updated: Add a check for not templateable fields (#29821)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7963360b8d Add a check for not templateable fields (#29821)
7963360b8d is described below

commit 7963360b8d43a15791a6b7d4335f482fce1d82d2
Author: Hussein Awala <ho...@gmail.com>
AuthorDate: Sat Mar 4 19:19:09 2023 +0100

    Add a check for not templateable fields (#29821)
---
 airflow/serialization/serialized_objects.py   |  6 ++++++
 tests/serialization/test_dag_serialization.py | 18 +++++++++++++++++-
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index db20b61d74..7200094547 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 import collections.abc
 import datetime
 import enum
+import inspect
 import logging
 import warnings
 import weakref
@@ -702,6 +703,7 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
     def task_type(self) -> str:
         # Overwrites task_type of BaseOperator to use _task_type instead of
         # __class__.__name__.
+
         return self._task_type
 
     @task_type.setter
@@ -770,8 +772,12 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
 
         # Store all template_fields as they are if there are JSON Serializable
         # If not, store them as strings
+        # And raise an exception if the field is not templateable
+        forbidden_fields = set(inspect.signature(BaseOperator.__init__).parameters.keys())
         if op.template_fields:
             for template_field in op.template_fields:
+                if template_field in forbidden_fields:
+                    raise AirflowException(f"Cannot template BaseOperator fields: {template_field}")
                 value = getattr(op, template_field, None)
                 if not cls._is_excluded(value, template_field, op):
                     serialize_op[template_field] = serialize_template_field(value)
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 43ce80c09c..967c433bdc 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -38,7 +38,7 @@ from kubernetes.client import models as k8s
 
 import airflow
 from airflow.datasets import Dataset
-from airflow.exceptions import SerializationError
+from airflow.exceptions import AirflowException, SerializationError
 from airflow.hooks.base import BaseHook
 from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.models import DAG, Connection, DagBag, Operator
@@ -2016,6 +2016,22 @@ class TestStringifiedDAGs:
         assert param.description == "hello"
         assert param.schema == {"type": "string"}
 
+    def test_not_templateable_fields_in_serialized_dag(
+        self,
+    ):
+        """
+        Test that when we use  not templateable fields, an Airflow exception is raised.
+        """
+
+        class TestOperator(BaseOperator):
+            template_fields = ("execution_timeout",)
+
+        dag = DAG("test_not_templateable_fields", start_date=datetime(2019, 8, 1))
+        with dag:
+            TestOperator(task_id="test", execution_timeout=timedelta(seconds=10))
+        with pytest.raises(AirflowException, match="Cannot template BaseOperator fields: execution_timeout"):
+            SerializedDAG.to_dict(dag)
+
 
 def test_kubernetes_optional():
     """Serialisation / deserialisation continues to work without kubernetes installed"""