You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/03/07 16:16:24 UTC
[airflow] 17/23: Add a check for not templateable fields (#29821)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5add348e59c5ada75a3d73976090d4a6b6d43e6e
Author: Hussein Awala <ho...@gmail.com>
AuthorDate: Sat Mar 4 19:19:09 2023 +0100
Add a check for not templateable fields (#29821)
(cherry picked from commit 7963360b8d43a15791a6b7d4335f482fce1d82d2)
---
airflow/serialization/serialized_objects.py | 6 ++++++
tests/serialization/test_dag_serialization.py | 18 +++++++++++++++++-
2 files changed, 23 insertions(+), 1 deletion(-)
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 6cb33cd203..e3bc785701 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -20,6 +20,7 @@ from __future__ import annotations
import collections.abc
import datetime
import enum
+import inspect
import logging
import warnings
import weakref
@@ -701,6 +702,7 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
def task_type(self) -> str:
# Overwrites task_type of BaseOperator to use _task_type instead of
# __class__.__name__.
+
return self._task_type
@task_type.setter
@@ -769,8 +771,12 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
# Store all template_fields as they are if there are JSON Serializable
# If not, store them as strings
+ # And raise an exception if the field is not templateable
+ forbidden_fields = set(inspect.signature(BaseOperator.__init__).parameters.keys())
if op.template_fields:
for template_field in op.template_fields:
+ if template_field in forbidden_fields:
+ raise AirflowException(f"Cannot template BaseOperator fields: {template_field}")
value = getattr(op, template_field, None)
if not cls._is_excluded(value, template_field, op):
serialize_op[template_field] = serialize_template_field(value)
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index ec07d60954..305efd2ea0 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -38,7 +38,7 @@ from kubernetes.client import models as k8s
import airflow
from airflow.datasets import Dataset
-from airflow.exceptions import SerializationError
+from airflow.exceptions import AirflowException, SerializationError
from airflow.hooks.base import BaseHook
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.models import DAG, Connection, DagBag, Operator
@@ -1867,6 +1867,22 @@ class TestStringifiedDAGs:
assert param.description == "hello"
assert param.schema == {"type": "string"}
+ def test_not_templateable_fields_in_serialized_dag(
+ self,
+ ):
+ """
+ Test that when we use not templateable fields, an Airflow exception is raised.
+ """
+
+ class TestOperator(BaseOperator):
+ template_fields = ("execution_timeout",)
+
+ dag = DAG("test_not_templateable_fields", start_date=datetime(2019, 8, 1))
+ with dag:
+ TestOperator(task_id="test", execution_timeout=timedelta(seconds=10))
+ with pytest.raises(AirflowException, match="Cannot template BaseOperator fields: execution_timeout"):
+ SerializedDAG.to_dict(dag)
+
def test_kubernetes_optional():
"""Serialisation / deserialisation continues to work without kubernetes installed"""