You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/12/05 02:51:42 UTC
[airflow] branch main updated: Allow longer pod names for k8s executor / KPO (#27736)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d93240696b Allow longer pod names for k8s executor / KPO (#27736)
d93240696b is described below
commit d93240696beeca7d28542d0fe0b53871b3d6612c
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Sun Dec 4 18:51:35 2022 -0800
Allow longer pod names for k8s executor / KPO (#27736)
Previously this limited pod length to 63 but that's the limit for label values not pod ids. I assume that's just an atavism. Meanwhile, a UUID was previously used which takes up 32 characters, much more space than necessary. We use 8 random alphanum chars now which should be plenty.
---
airflow/kubernetes/kubernetes_helper_functions.py | 42 +++++-
airflow/kubernetes/pod_generator.py | 35 +++--
.../cncf/kubernetes/operators/kubernetes_pod.py | 85 ++++++++----
tests/kubernetes/models/test_secret.py | 18 +--
.../kubernetes/test_kubernetes_helper_functions.py | 132 +++++++++++--------
tests/kubernetes/test_pod_generator.py | 145 ++++++++++-----------
.../kubernetes/operators/test_kubernetes_pod.py | 37 ++----
7 files changed, 285 insertions(+), 209 deletions(-)
diff --git a/airflow/kubernetes/kubernetes_helper_functions.py b/airflow/kubernetes/kubernetes_helper_functions.py
index 1f0c809cf9..6422cb35b4 100644
--- a/airflow/kubernetes/kubernetes_helper_functions.py
+++ b/airflow/kubernetes/kubernetes_helper_functions.py
@@ -17,6 +17,8 @@
from __future__ import annotations
import logging
+import secrets
+import string
import pendulum
from slugify import slugify
@@ -25,17 +27,41 @@ from airflow.models.taskinstance import TaskInstanceKey
log = logging.getLogger(__name__)
+alphanum_lower = string.ascii_lowercase + string.digits
-def create_pod_id(dag_id: str | None = None, task_id: str | None = None) -> str:
+
+def rand_str(num):
+ """Generate random lowercase alphanumeric string of length num.
+
+ :meta private:
+ """
+ return "".join(secrets.choice(alphanum_lower) for _ in range(num))
+
+
+def add_pod_suffix(*, pod_name, rand_len=8, max_len=80):
+ """Add random string to pod name while staying under max len"""
+ suffix = "-" + rand_str(rand_len)
+ return pod_name[: max_len - len(suffix)].strip("-.") + suffix
+
+
+def create_pod_id(
+ dag_id: str | None = None,
+ task_id: str | None = None,
+ *,
+ max_length: int = 80,
+ unique: bool = True,
+) -> str:
"""
- Generates the kubernetes safe pod_id. Note that this is
- NOT the full ID that will be launched to k8s. We will add a uuid
- to ensure uniqueness.
+ Generates unique pod ID given a dag_id and / or task_id.
:param dag_id: DAG ID
:param task_id: Task ID
- :return: The non-unique pod_id for this task/DAG pairing
+ :param max_length: max number of characters
+ :param unique: whether a random string suffix should be added
+ :return: A valid identifier for a kubernetes pod name
"""
+ if not (dag_id or task_id):
+ raise ValueError("Must supply either dag_id or task_id.")
name = ""
if dag_id:
name += dag_id
@@ -43,7 +69,11 @@ def create_pod_id(dag_id: str | None = None, task_id: str | None = None) -> str:
if name:
name += "-"
name += task_id
- return slugify(name, lowercase=True)[:253].strip("-.")
+ base_name = slugify(name, lowercase=True)[:max_length].strip(".-")
+ if unique:
+ return add_pod_suffix(pod_name=base_name, rand_len=8, max_len=max_length)
+ else:
+ return base_name
def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey:
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 4382b390aa..7af1632559 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -28,7 +28,6 @@ import hashlib
import logging
import os
import re
-import uuid
import warnings
from functools import reduce
@@ -37,6 +36,7 @@ from kubernetes.client import models as k8s
from kubernetes.client.api_client import ApiClient
from airflow.exceptions import AirflowConfigException, PodReconciliationError, RemovedInAirflow3Warning
+from airflow.kubernetes.kubernetes_helper_functions import add_pod_suffix, rand_str
from airflow.kubernetes.pod_generator_deprecated import PodDefaults, PodGenerator as PodGeneratorDeprecated
from airflow.utils import yaml
from airflow.version import version as airflow_version
@@ -125,9 +125,10 @@ class PodGenerator:
def gen_pod(self) -> k8s.V1Pod:
"""Generates pod"""
+ warnings.warn("This function is deprecated. ", RemovedInAirflow3Warning)
result = self.ud_pod
- result.metadata.name = self.make_unique_pod_id(result.metadata.name)
+ result.metadata.name = add_pod_suffix(pod_name=result.metadata.name)
if self.extract_xcom:
result = self.add_xcom_sidecar(result)
@@ -340,6 +341,11 @@ class PodGenerator:
- executor_config
- dynamic arguments
"""
+ if len(pod_id) > 253:
+ warnings.warn(
+ "pod_id supplied is longer than 253 characters; truncating and adding unique suffix."
+ )
+ pod_id = add_pod_suffix(pod_name=pod_id, max_len=253)
try:
image = pod_override_object.spec.containers[0].image # type: ignore
if not image:
@@ -374,7 +380,7 @@ class PodGenerator:
metadata=k8s.V1ObjectMeta(
namespace=namespace,
annotations=annotations,
- name=PodGenerator.make_unique_pod_id(pod_id),
+ name=pod_id,
labels=labels,
),
spec=k8s.V1PodSpec(
@@ -445,7 +451,7 @@ class PodGenerator:
r"""
Kubernetes pod names must consist of one or more lowercase
rfc1035/rfc1123 labels separated by '.' with a maximum length of 253
- characters. Each label has a maximum length of 63 characters.
+ characters.
Name must pass the following regex for validation
``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$``
@@ -453,21 +459,22 @@ class PodGenerator:
For more details, see:
https://github.com/kubernetes/kubernetes/blob/release-1.1/docs/design/identifiers.md
- :param pod_id: a dag_id with only alphanumeric characters
+ :param pod_id: requested pod name
:return: ``str`` valid Pod name of appropriate length
"""
+ warnings.warn(
+ "This function is deprecated. Use `add_pod_suffix` in `kubernetes_helper_functions`.",
+ RemovedInAirflow3Warning,
+ )
+
if not pod_id:
return None
- safe_uuid = uuid.uuid4().hex # safe uuid will always be less than 63 chars
-
- # Get prefix length after subtracting the uuid length. Clean up '.' and '-' from
- # end of podID ('.' can't be followed by '-').
- label_prefix_length = MAX_LABEL_LEN - len(safe_uuid) - 1 # -1 for separator
- trimmed_pod_id = pod_id[:label_prefix_length].rstrip("-.")
-
- # previously used a '.' as the separator, but this could create errors in some situations
- return f"{trimmed_pod_id}-{safe_uuid}"
+ max_pod_id_len = 100 # arbitrarily chosen
+ suffix = rand_str(8) # 8 seems good enough
+ base_pod_id_len = max_pod_id_len - len(suffix) - 1 # -1 for separator
+ trimmed_pod_id = pod_id[:base_pod_id_len].rstrip("-.")
+ return f"{trimmed_pod_id}-{suffix}"
def merge_objects(base_obj, client_obj):
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 990015b625..6aafce2f12 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -20,11 +20,14 @@ from __future__ import annotations
import json
import logging
import re
+import secrets
+import string
import warnings
from contextlib import AbstractContextManager
from typing import TYPE_CHECKING, Any, Sequence
from kubernetes.client import CoreV1Api, models as k8s
+from slugify import slugify
from airflow.compat.functools import cached_property
from airflow.exceptions import AirflowException
@@ -61,30 +64,62 @@ if TYPE_CHECKING:
from airflow.utils.context import Context
+alphanum_lower = string.ascii_lowercase + string.digits
-def _task_id_to_pod_name(val: str) -> str:
+
+def _rand_str(num):
+ """Generate random lowercase alphanumeric string of length num.
+
+ TODO: when min airflow version >= 2.5, delete this function and import from kubernetes_helper_functions.
+
+ :meta private:
+ """
+ return "".join(secrets.choice(alphanum_lower) for _ in range(num))
+
+
+def _add_pod_suffix(*, pod_name, rand_len=8, max_len=253):
+ """Add random string to pod name while staying under max len
+
+ TODO: when min airflow version >= 2.5, delete this function and import from kubernetes_helper_functions.
+
+ :meta private:
"""
- Given a task_id, convert it to a pod name.
- Adds a 0 if start or end char is invalid.
- Replaces any other invalid char with `-`.
+ suffix = "-" + _rand_str(rand_len)
+ return pod_name[: max_len - len(suffix)].strip("-.") + suffix
- :param val: non-empty string, presumed to be a task id
- :return valid kubernetes object name.
+
+def _create_pod_id(
+ dag_id: str | None = None,
+ task_id: str | None = None,
+ *,
+ max_length: int = 80,
+ unique: bool = True,
+) -> str:
"""
- if not val:
- raise ValueError("_task_id_to_pod_name requires non-empty string.")
- val = val.lower()
- if not re.match(r"[a-z0-9]", val[0]):
- val = f"0{val}"
- if not re.match(r"[a-z0-9]", val[-1]):
- val = f"{val}0"
- val = re.sub(r"[^a-z0-9\-.]", "-", val)
- if len(val) > 253:
- raise ValueError(
- f"Pod name {val} is longer than 253 characters. "
- "See https://kubernetes.io/docs/concepts/overview/working-with-objects/names/."
- )
- return val
+ Generates unique pod ID given a dag_id and / or task_id.
+
+ TODO: when min airflow version >= 2.5, delete this function and import from kubernetes_helper_functions.
+
+ :param dag_id: DAG ID
+ :param task_id: Task ID
+ :param max_length: max number of characters
+ :param unique: whether a random string suffix should be added
+ :return: A valid identifier for a kubernetes pod name
+ """
+ if not (dag_id or task_id):
+ raise ValueError("Must supply either dag_id or task_id.")
+ name = ""
+ if dag_id:
+ name += dag_id
+ if task_id:
+ if name:
+ name += "-"
+ name += task_id
+ base_name = slugify(name, lowercase=True)[:max_length].strip(".-")
+ if unique:
+ return _add_pod_suffix(pod_name=base_name, max_len=max_length)
+ else:
+ return base_name
class PodReattachFailure(AirflowException):
@@ -198,7 +233,7 @@ class KubernetesPodOperator(BaseOperator):
namespace: str | None = None,
image: str | None = None,
name: str | None = None,
- random_name_suffix: bool | None = True,
+ random_name_suffix: bool = True,
cmds: list[str] | None = None,
arguments: list[str] | None = None,
ports: list[k8s.V1ContainerPort] | None = None,
@@ -613,10 +648,10 @@ class KubernetesPodOperator(BaseOperator):
pod = PodGenerator.reconcile_pods(pod_template, pod)
if not pod.metadata.name:
- pod.metadata.name = _task_id_to_pod_name(self.task_id)
-
- if self.random_name_suffix:
- pod.metadata.name = PodGenerator.make_unique_pod_id(pod.metadata.name)
+ pod.metadata.name = _create_pod_id(task_id=self.task_id, unique=self.random_name_suffix)
+ elif self.random_name_suffix:
+ # user has supplied pod name, we're just adding suffix
+ pod.metadata.name = _add_pod_suffix(pod_name=pod.metadata.name)
if not pod.metadata.namespace:
# todo: replace with call to `hook.get_namespace` in 6.0, when it doesn't default to `default`.
diff --git a/tests/kubernetes/models/test_secret.py b/tests/kubernetes/models/test_secret.py
index 2a072d0401..df7e4b7817 100644
--- a/tests/kubernetes/models/test_secret.py
+++ b/tests/kubernetes/models/test_secret.py
@@ -63,11 +63,14 @@ class TestSecret:
)
@mock.patch("uuid.uuid4")
- def test_attach_to_pod(self, mock_uuid):
+ @mock.patch("airflow.kubernetes.pod_generator.rand_str")
+ def test_attach_to_pod(self, mock_rand_str, mock_uuid):
static_uuid = uuid.UUID("cf4a56d2-8101-4217-b027-2af6216feb48")
mock_uuid.return_value = static_uuid
+ rand_str = "abcd1234"
+ mock_rand_str.return_value = rand_str
path = sys.path[0] + "/tests/kubernetes/pod_generator_base.yaml"
- pod = PodGenerator(pod_template_file=path).gen_pod()
+ pod = PodGenerator(pod_template_file=path).ud_pod
secrets = [
# This should be a secretRef
Secret("env", None, "secret_a"),
@@ -84,7 +87,7 @@ class TestSecret:
"kind": "Pod",
"metadata": {
"labels": {"app": "myapp"},
- "name": "myapp-pod-cf4a56d281014217b0272af6216feb48",
+ "name": "myapp-pod",
"namespace": "default",
},
"spec": {
@@ -108,7 +111,6 @@ class TestSecret:
"ports": [{"containerPort": 1234, "name": "foo"}],
"resources": {"limits": {"memory": "200Mi"}, "requests": {"memory": "100Mi"}},
"volumeMounts": [
- {"mountPath": "/airflow/xcom", "name": "xcom"},
{
"mountPath": "/etc/foo",
"name": "secretvol" + str(static_uuid),
@@ -116,19 +118,11 @@ class TestSecret:
},
],
},
- {
- "command": ["sh", "-c", 'trap "exit 0" INT; while true; do sleep 30; done;'],
- "image": "alpine",
- "name": "airflow-xcom-sidecar",
- "resources": {"requests": {"cpu": "1m"}},
- "volumeMounts": [{"mountPath": "/airflow/xcom", "name": "xcom"}],
- },
],
"hostNetwork": True,
"imagePullSecrets": [{"name": "pull_secret_a"}, {"name": "pull_secret_b"}],
"securityContext": {"fsGroup": 2000, "runAsUser": 1000},
"volumes": [
- {"emptyDir": {}, "name": "xcom"},
{"name": "secretvol" + str(static_uuid), "secret": {"secretName": "secret_b"}},
],
},
diff --git a/tests/kubernetes/test_kubernetes_helper_functions.py b/tests/kubernetes/test_kubernetes_helper_functions.py
index b90125aeb8..43bdab47e1 100644
--- a/tests/kubernetes/test_kubernetes_helper_functions.py
+++ b/tests/kubernetes/test_kubernetes_helper_functions.py
@@ -20,67 +20,97 @@ from __future__ import annotations
import re
import pytest
+from pytest import param
from airflow.kubernetes.kubernetes_helper_functions import create_pod_id
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import _create_pod_id
pod_name_regex = r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
+# todo: when cncf provider min airflow version >= 2.5 remove this parameterization
+# we added this function to provider temporarily until min airflow version catches up
+# meanwhile, we use this one test to test both core and provider
@pytest.mark.parametrize(
- "val, expected",
- [
- ("task-id", "task-id"), # no problem
- ("task_id", "task-id"), # underscores
- ("---task.id---", "task-id"), # dots
- (".task.id", "task-id"), # leading dot invalid
- ("**task.id", "task-id"), # leading dot invalid
- ("-90Abc*&", "90abc"), # invalid ends
- ("90AçLbˆˆç˙ßߘ˜˙c*a", "90aclb-c-ssss-c-a"), # weird unicode
- ],
+ "create_pod_id", [param(_create_pod_id, id="provider"), param(create_pod_id, id="core")]
)
-def test_create_pod_id_task_only(val, expected):
- actual = create_pod_id(task_id=val)
- assert actual == expected
- assert re.match(pod_name_regex, actual)
+class TestCreatePodId:
+ @pytest.mark.parametrize(
+ "val, expected",
+ [
+ ("task-id", "task-id"), # no problem
+ ("task_id", "task-id"), # underscores
+ ("---task.id---", "task-id"), # dots
+ (".task.id", "task-id"), # leading dot invalid
+ ("**task.id", "task-id"), # leading dot invalid
+ ("-90Abc*&", "90abc"), # invalid ends
+ ("90AçLbˆˆç˙ßߘ˜˙c*a", "90aclb-c-ssss-c-a"), # weird unicode
+ ],
+ )
+ def test_create_pod_id_task_only(self, val, expected, create_pod_id):
+ actual = create_pod_id(task_id=val, unique=False)
+ assert actual == expected
+ assert re.match(pod_name_regex, actual)
+ @pytest.mark.parametrize(
+ "val, expected",
+ [
+ ("dag-id", "dag-id"), # no problem
+ ("dag_id", "dag-id"), # underscores
+ ("---dag.id---", "dag-id"), # dots
+ (".dag.id", "dag-id"), # leading dot invalid
+ ("**dag.id", "dag-id"), # leading dot invalid
+ ("-90Abc*&", "90abc"), # invalid ends
+ ("90AçLbˆˆç˙ßߘ˜˙c*a", "90aclb-c-ssss-c-a"), # weird unicode
+ ],
+ )
+ def test_create_pod_id_dag_only(self, val, expected, create_pod_id):
+ actual = create_pod_id(dag_id=val, unique=False)
+ assert actual == expected
+ assert re.match(pod_name_regex, actual)
-@pytest.mark.parametrize(
- "val, expected",
- [
- ("dag-id", "dag-id"), # no problem
- ("dag_id", "dag-id"), # underscores
- ("---dag.id---", "dag-id"), # dots
- (".dag.id", "dag-id"), # leading dot invalid
- ("**dag.id", "dag-id"), # leading dot invalid
- ("-90Abc*&", "90abc"), # invalid ends
- ("90AçLbˆˆç˙ßߘ˜˙c*a", "90aclb-c-ssss-c-a"), # weird unicode
- ],
-)
-def test_create_pod_id_dag_only(val, expected):
- actual = create_pod_id(dag_id=val)
- assert actual == expected
- assert re.match(pod_name_regex, actual)
-
+ @pytest.mark.parametrize(
+ "dag_id, task_id, expected",
+ [
+ ("dag-id", "task-id", "dag-id-task-id"), # no problem
+ ("dag_id", "task_id", "dag-id-task-id"), # underscores
+ ("dag.id", "task.id", "dag-id-task-id"), # dots
+ (".dag.id", ".---task.id", "dag-id-task-id"), # leading dot invalid
+ ("**dag.id", "**task.id", "dag-id-task-id"), # leading dot invalid
+ ("-90Abc*&", "-90Abc*&", "90abc-90abc"), # invalid ends
+ ("90AçLbˆˆç˙ßߘ˜˙c*a", "90AçLbˆˆç˙ßߘ˜˙c*a", "90aclb-c-ssss-c-a-90aclb-c-ssss-c-a"), # ugly
+ ],
+ )
+ def test_create_pod_id_dag_and_task(self, dag_id, task_id, expected, create_pod_id):
+ actual = create_pod_id(dag_id=dag_id, task_id=task_id, unique=False)
+ assert actual == expected
+ assert re.match(pod_name_regex, actual)
-@pytest.mark.parametrize(
- "dag_id, task_id, expected",
- [
- ("dag-id", "task-id", "dag-id-task-id"), # no problem
- ("dag_id", "task_id", "dag-id-task-id"), # underscores
- ("dag.id", "task.id", "dag-id-task-id"), # dots
- (".dag.id", ".---task.id", "dag-id-task-id"), # leading dot invalid
- ("**dag.id", "**task.id", "dag-id-task-id"), # leading dot invalid
- ("-90Abc*&", "-90Abc*&", "90abc-90abc"), # invalid ends
- ("90AçLbˆˆç˙ßߘ˜˙c*a", "90AçLbˆˆç˙ßߘ˜˙c*a", "90aclb-c-ssss-c-a-90aclb-c-ssss-c-a"), # ugly
- ],
-)
-def test_create_pod_id_dag_and_task(dag_id, task_id, expected):
- actual = create_pod_id(dag_id=dag_id, task_id=task_id)
- assert actual == expected
- assert re.match(pod_name_regex, actual)
+ def test_create_pod_id_dag_too_long_with_suffix(self, create_pod_id):
+ actual = create_pod_id("0" * 254)
+ assert re.match(r"0{71}-[a-z0-9]{8}", actual)
+ assert re.match(pod_name_regex, actual)
+ def test_create_pod_id_dag_too_long_non_unique(self, create_pod_id):
+ actual = create_pod_id("0" * 254, unique=False)
+ assert re.match(r"0{80}", actual)
+ assert re.match(pod_name_regex, actual)
-def test_create_pod_id_dag_too_long():
- actual = create_pod_id("0" * 254)
- assert actual == "0" * 253
- assert re.match(pod_name_regex, actual)
+ @pytest.mark.parametrize("unique", [True, False])
+ @pytest.mark.parametrize("length", [25, 100, 200, 300])
+ def test_create_pod_id(self, create_pod_id, length, unique):
+ """Test behavior of max_length and unique."""
+ dag_id = "dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-"
+ task_id = "task-task-task-task-task-task-task-task-task-task-task-task-task-task-task-task-task-"
+ actual = create_pod_id(
+ dag_id=dag_id,
+ task_id=task_id,
+ max_length=length,
+ unique=unique,
+ )
+ base = f"{dag_id}{task_id}".strip("-")
+ if unique:
+ assert actual[:-9] == base[: length - 9].strip("-")
+ assert re.match(r"-[a-z0-9]{8}", actual[-9:])
+ else:
+ assert actual == base[:length]
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index bae79fd2a5..1314f763e8 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -19,14 +19,13 @@ from __future__ import annotations
import os
import re
import sys
-import uuid
from unittest import mock
from unittest.mock import MagicMock
import pytest
from dateutil import parser
from kubernetes.client import ApiClient, models as k8s
-from parameterized import parameterized
+from pytest import param
from airflow import __version__
from airflow.exceptions import AirflowConfigException, PodReconciliationError
@@ -42,7 +41,7 @@ from airflow.kubernetes.secret import Secret
class TestPodGenerator:
def setup_method(self):
- self.static_uuid = uuid.UUID("cf4a56d2-8101-4217-b027-2af6216feb48")
+ self.rand_str = "abcd1234"
self.deserialize_result = {
"apiVersion": "v1",
"kind": "Pod",
@@ -92,7 +91,7 @@ class TestPodGenerator:
}
self.metadata = {
"labels": self.labels,
- "name": "pod_id-" + self.static_uuid.hex,
+ "name": "pod_id-" + self.rand_str,
"namespace": "namespace",
"annotations": self.annotations,
}
@@ -112,7 +111,7 @@ class TestPodGenerator:
kind="Pod",
metadata=k8s.V1ObjectMeta(
namespace="default",
- name="myapp-pod-" + self.static_uuid.hex,
+ name="myapp-pod-" + self.rand_str,
labels={"app": "myapp"},
),
spec=k8s.V1PodSpec(
@@ -160,14 +159,17 @@ class TestPodGenerator:
),
)
- @mock.patch("uuid.uuid4")
- def test_gen_pod_extract_xcom(self, mock_uuid):
- mock_uuid.return_value = self.static_uuid
+ @mock.patch("airflow.kubernetes.kubernetes_helper_functions.rand_str")
+ def test_gen_pod_extract_xcom(self, mock_rand_str):
+ """
+ Method gen_pod is used nowhere in codebase and is deprecated.
+ This test is only retained for backcompat.
+ """
+ mock_rand_str.return_value = self.rand_str
path = sys.path[0] + "/tests/kubernetes/pod_generator_base_with_secrets.yaml"
pod_generator = PodGenerator(pod_template_file=path, extract_xcom=True)
result = pod_generator.gen_pod()
- result_dict = self.k8s_client.sanitize_for_serialization(result)
container_two = {
"name": "airflow-xcom-sidecar",
"image": "alpine",
@@ -189,7 +191,6 @@ class TestPodGenerator:
)
result_dict = self.k8s_client.sanitize_for_serialization(result)
expected_dict = self.k8s_client.sanitize_for_serialization(self.expected)
-
assert result_dict == expected_dict
def test_from_obj(self):
@@ -322,18 +323,11 @@ class TestPodGenerator:
},
} == result
- @mock.patch("uuid.uuid4")
- def test_reconcile_pods_empty_mutator_pod(self, mock_uuid):
- mock_uuid.return_value = self.static_uuid
+ def test_reconcile_pods_empty_mutator_pod(self):
path = sys.path[0] + "/tests/kubernetes/pod_generator_base_with_secrets.yaml"
-
pod_generator = PodGenerator(pod_template_file=path, extract_xcom=True)
- base_pod = pod_generator.gen_pod()
+ base_pod = pod_generator.ud_pod
mutator_pod = None
- name = "name1-" + self.static_uuid.hex
-
- base_pod.metadata.name = name
-
result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
assert base_pod == result
@@ -341,12 +335,12 @@ class TestPodGenerator:
result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
assert base_pod == result
- @mock.patch("uuid.uuid4")
- def test_reconcile_pods(self, mock_uuid):
- mock_uuid.return_value = self.static_uuid
+ @mock.patch("airflow.kubernetes.kubernetes_helper_functions.rand_str")
+ def test_reconcile_pods(self, mock_rand_str):
+ mock_rand_str.return_value = self.rand_str
path = sys.path[0] + "/tests/kubernetes/pod_generator_base_with_secrets.yaml"
- base_pod = PodGenerator(pod_template_file=path, extract_xcom=False).gen_pod()
+ base_pod = PodGenerator(pod_template_file=path, extract_xcom=False).ud_pod
mutator_pod = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
@@ -400,15 +394,13 @@ class TestPodGenerator:
@pytest.mark.parametrize(
"config_image, expected_image",
[
- pytest.param("my_image:my_tag", "my_image:my_tag", id="image_in_cfg"),
- pytest.param(None, "busybox", id="no_image_in_cfg"),
+ param("my_image:my_tag", "my_image:my_tag", id="image_in_cfg"),
+ param(None, "busybox", id="no_image_in_cfg"),
],
)
- @mock.patch("uuid.uuid4")
- def test_construct_pod(self, mock_uuid, config_image, expected_image):
+ def test_construct_pod(self, config_image, expected_image):
template_file = sys.path[0] + "/tests/kubernetes/pod_generator_base_with_secrets.yaml"
worker_config = PodGenerator.deserialize_model_file(template_file)
- mock_uuid.return_value = self.static_uuid
executor_config = k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
@@ -436,7 +428,7 @@ class TestPodGenerator:
expected.metadata.labels = self.labels
expected.metadata.labels["app"] = "myapp"
expected.metadata.annotations = self.annotations
- expected.metadata.name = "pod_id-" + self.static_uuid.hex
+ expected.metadata.name = "pod_id"
expected.metadata.namespace = "test_namespace"
expected.spec.containers[0].args = ["command"]
expected.spec.containers[0].image = expected_image
@@ -452,12 +444,9 @@ class TestPodGenerator:
assert expected_dict == result_dict
- @mock.patch("uuid.uuid4")
- def test_construct_pod_mapped_task(self, mock_uuid):
+ def test_construct_pod_mapped_task(self):
template_file = sys.path[0] + "/tests/kubernetes/pod_generator_base.yaml"
worker_config = PodGenerator.deserialize_model_file(template_file)
- mock_uuid.return_value = self.static_uuid
-
result = PodGenerator.construct_pod(
dag_id=self.dag_id,
task_id=self.task_id,
@@ -478,7 +467,7 @@ class TestPodGenerator:
expected.metadata.labels["map_index"] = "0"
expected.metadata.annotations = self.annotations
expected.metadata.annotations["map_index"] = "0"
- expected.metadata.name = "pod_id-" + self.static_uuid.hex
+ expected.metadata.name = "pod_id"
expected.metadata.namespace = "test_namespace"
expected.spec.containers[0].args = ["command"]
del expected.spec.containers[0].env_from[1:]
@@ -489,11 +478,9 @@ class TestPodGenerator:
assert expected_dict == result_dict
- @mock.patch("uuid.uuid4")
- def test_construct_pod_empty_executor_config(self, mock_uuid):
+ def test_construct_pod_empty_executor_config(self):
path = sys.path[0] + "/tests/kubernetes/pod_generator_base_with_secrets.yaml"
worker_config = PodGenerator.deserialize_model_file(path)
- mock_uuid.return_value = self.static_uuid
executor_config = None
result = PodGenerator.construct_pod(
@@ -515,23 +502,23 @@ class TestPodGenerator:
worker_config.metadata.annotations = self.annotations
worker_config.metadata.labels = self.labels
worker_config.metadata.labels["app"] = "myapp"
- worker_config.metadata.name = "pod_id-" + self.static_uuid.hex
+ worker_config.metadata.name = "pod_id"
worker_config.metadata.namespace = "namespace"
worker_config.spec.containers[0].env.append(
k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value="True")
)
worker_config_result = self.k8s_client.sanitize_for_serialization(worker_config)
- assert worker_config_result == sanitized_result
+ assert sanitized_result == worker_config_result
- @mock.patch("uuid.uuid4")
- def test_construct_pod_attribute_error(self, mock_uuid):
+ @mock.patch("airflow.kubernetes.kubernetes_helper_functions.rand_str")
+ def test_construct_pod_attribute_error(self, mock_rand_str):
"""
After upgrading k8s library we might get attribute error.
In this case it should raise PodReconciliationError
"""
path = sys.path[0] + "/tests/kubernetes/pod_generator_base_with_secrets.yaml"
worker_config = PodGenerator.deserialize_model_file(path)
- mock_uuid.return_value = self.static_uuid
+ mock_rand_str.return_value = self.rand_str
executor_config = MagicMock()
executor_config.side_effect = AttributeError("error")
@@ -550,9 +537,9 @@ class TestPodGenerator:
scheduler_job_id="uuid",
)
- @mock.patch("uuid.uuid4")
- def test_ensure_max_label_length(self, mock_uuid):
- mock_uuid.return_value = self.static_uuid
+ @mock.patch("airflow.kubernetes.kubernetes_helper_functions.rand_str")
+ def test_ensure_max_identifier_length(self, mock_rand_str):
+ mock_rand_str.return_value = self.rand_str
path = os.path.join(os.path.dirname(__file__), "pod_generator_base_with_secrets.yaml")
worker_config = PodGenerator.deserialize_model_file(path)
@@ -570,7 +557,7 @@ class TestPodGenerator:
base_worker_pod=worker_config,
)
- assert result.metadata.name == "a" * 30 + "-" + self.static_uuid.hex
+ assert result.metadata.name == "a" * 244 + "-" + self.rand_str
for _, v in result.metadata.labels.items():
assert len(v) <= 63
@@ -727,30 +714,33 @@ class TestPodGenerator:
assert len(caplog.records) == 1
assert "does not exist" in caplog.text
- @parameterized.expand(
+ @pytest.mark.parametrize(
+ "input",
(
- ("max_label_length", "a" * 63),
- ("max_subdomain_length", "a" * 253),
- (
- "tiny",
- "aaa",
- ),
- )
+ param("a" * 70, id="max_label_length"),
+ param("a" * 253, id="max_subdomain_length"),
+ param("a" * 95, id="close to max"),
+ param("aaa", id="tiny"),
+ ),
)
- def test_pod_name_confirm_to_max_length(self, _, pod_id):
- name = PodGenerator.make_unique_pod_id(pod_id)
- assert len(name) <= 253
- parts = name.split("-")
-
- # 63 is the MAX_LABEL_LEN in pod_generator.py
- # 33 is the length of uuid4 + 1 for the separating '-' (32 + 1)
- # 30 is the max length of the prefix
- # so 30 = 63 - (32 + 1)
- assert len(parts[0]) <= 30
- assert len(parts[1]) == 32
-
- @parameterized.expand(
+ def test_pod_name_confirm_to_max_length(self, input):
+ actual = PodGenerator.make_unique_pod_id(input)
+ assert len(actual) <= 100
+ actual_base, actual_suffix = actual.rsplit("-", maxsplit=1)
+ # we limit pod id length to 100
+ # random suffix is 8 chars plus the '-' separator
+ # so actual pod id base should first 91 chars of requested pod id
+ assert actual_base == input[:91]
+ # suffix should always be 8, the random alphanum
+ assert re.match(r"^[a-z0-9]{8}$", actual_suffix)
+
+ @pytest.mark.parametrize(
+ "pod_id, expected_starts_with",
(
+ (
+ "somewhat-long-pod-name-maybe-longer-than-previously-supported-with-hyphen-",
+ "somewhat-long-pod-name-maybe-longer-than-previously-supported-with-hyphen",
+ ),
("pod-name-with-hyphen-", "pod-name-with-hyphen"),
("pod-name-with-double-hyphen--", "pod-name-with-double-hyphen"),
("pod0-name", "pod0-name"),
@@ -758,17 +748,22 @@ class TestPodGenerator:
("pod-name-with-dot.", "pod-name-with-dot"),
("pod-name-with-double-dot..", "pod-name-with-double-dot"),
("pod-name-with-hyphen-dot-.", "pod-name-with-hyphen-dot"),
- )
+ ),
)
def test_pod_name_is_valid(self, pod_id, expected_starts_with):
- name = PodGenerator.make_unique_pod_id(pod_id)
-
+ """
+ `make_unique_pod_id` doesn't actually guarantee that the regex passes for any input.
+ But I guess this test verifies that an otherwise valid pod_id doesn't get _screwed up_.
+ """
+ actual = PodGenerator.make_unique_pod_id(pod_id)
+ assert len(actual) <= 253
+ assert actual == actual.lower(), "not lowercase"
+ # verify using official k8s regex
regex = r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
- assert (
- len(name) <= 253 and all(ch.lower() == ch for ch in name) and re.match(regex, name)
- ), "pod_id is invalid - fails allowed regex check"
-
- assert name.rsplit("-", 1)[0] == expected_starts_with
+ assert re.match(regex, actual), "pod_id is invalid - fails allowed regex check"
+ assert actual.rsplit("-", 1)[0] == expected_starts_with
+ # verify ends with 8 char lowercase alphanum string
+ assert re.match(rf"^{expected_starts_with}-[a-z0-9]{{8}}$", actual), "doesn't match expected pattern"
def test_validate_pod_generator(self):
with pytest.raises(AirflowConfigException):
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index f202285504..844803947a 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -33,7 +33,6 @@ from airflow.models.xcom import XCom
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
_optionally_suppress,
- _task_id_to_pod_name,
)
from airflow.utils import timezone
from airflow.utils.session import create_session
@@ -992,7 +991,7 @@ class TestKubernetesPodOperator:
random_name_suffix=False,
)
pod = k.build_pod_request_obj({})
- assert pod.metadata.name == "0.hi.--09hi"
+ assert pod.metadata.name == "hi-09hi"
def test_task_id_as_name_with_suffix(self):
k = KubernetesPodOperator(
@@ -1000,9 +999,9 @@ class TestKubernetesPodOperator:
random_name_suffix=True,
)
pod = k.build_pod_request_obj({})
- expected = "0.hi.--09hi"
- assert pod.metadata.name.startswith(expected)
- assert re.match(rf"{expected}-[a-z0-9-]+", pod.metadata.name) is not None
+ expected = "hi-09hi"
+ assert pod.metadata.name[: len(expected)] == expected
+ assert re.match(rf"{expected}-[a-z0-9]{{8}}", pod.metadata.name) is not None
def test_task_id_as_name_with_suffix_very_long(self):
k = KubernetesPodOperator(
@@ -1010,7 +1009,13 @@ class TestKubernetesPodOperator:
random_name_suffix=True,
)
pod = k.build_pod_request_obj({})
- assert re.match(r"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-[a-z0-9-]+", pod.metadata.name) is not None
+ assert (
+ re.match(
+ r"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-[a-z0-9]{8}",
+ pod.metadata.name,
+ )
+ is not None
+ )
def test_task_id_as_name_dag_id_is_ignored(self):
dag = DAG(dag_id="this_is_a_dag_name", start_date=pendulum.now())
@@ -1078,23 +1083,3 @@ class TestSuppress:
with _optionally_suppress():
print("hi")
assert caplog.text == ""
-
-
-@pytest.mark.parametrize(
- "val, expected",
- [
- ("task-id", "task-id"), # no problem
- ("task_id", "task-id"), # underscores
- ("task.id", "task.id"), # dots ok
- (".task.id", "0.task.id"), # leading dot invalid
- ("-90Abc*&", "0-90abc--0"), # invalid ends
- ("90AçLbˆˆç˙ßߘ˜˙c*a", "90a-lb---------c-a"), # weird unicode
- ],
-)
-def test_task_id_to_pod_name(val, expected):
- assert _task_id_to_pod_name(val) == expected
-
-
-def test_task_id_to_pod_name_long():
- with pytest.raises(ValueError, match="longer than 253"):
- _task_id_to_pod_name("0" * 254)