You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/12/05 02:51:42 UTC

[airflow] branch main updated: Allow longer pod names for k8s executor / KPO (#27736)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d93240696b Allow longer pod names for k8s executor / KPO (#27736)
d93240696b is described below

commit d93240696beeca7d28542d0fe0b53871b3d6612c
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Sun Dec 4 18:51:35 2022 -0800

    Allow longer pod names for k8s executor / KPO (#27736)
    
    Previously this limited pod length to 63 but that's the limit for label values not pod ids.  I assume that's just an atavism.  Meanwhile, a UUID was previously used which takes up 32 characters, much more space than necessary.  We use 8 random alphanum chars now which should be plenty.
---
 airflow/kubernetes/kubernetes_helper_functions.py  |  42 +++++-
 airflow/kubernetes/pod_generator.py                |  35 +++--
 .../cncf/kubernetes/operators/kubernetes_pod.py    |  85 ++++++++----
 tests/kubernetes/models/test_secret.py             |  18 +--
 .../kubernetes/test_kubernetes_helper_functions.py | 132 +++++++++++--------
 tests/kubernetes/test_pod_generator.py             | 145 ++++++++++-----------
 .../kubernetes/operators/test_kubernetes_pod.py    |  37 ++----
 7 files changed, 285 insertions(+), 209 deletions(-)

diff --git a/airflow/kubernetes/kubernetes_helper_functions.py b/airflow/kubernetes/kubernetes_helper_functions.py
index 1f0c809cf9..6422cb35b4 100644
--- a/airflow/kubernetes/kubernetes_helper_functions.py
+++ b/airflow/kubernetes/kubernetes_helper_functions.py
@@ -17,6 +17,8 @@
 from __future__ import annotations
 
 import logging
+import secrets
+import string
 
 import pendulum
 from slugify import slugify
@@ -25,17 +27,41 @@ from airflow.models.taskinstance import TaskInstanceKey
 
 log = logging.getLogger(__name__)
 
+alphanum_lower = string.ascii_lowercase + string.digits
 
-def create_pod_id(dag_id: str | None = None, task_id: str | None = None) -> str:
+
+def rand_str(num):
+    """Generate random lowercase alphanumeric string of length num.
+
+    :meta private:
+    """
+    return "".join(secrets.choice(alphanum_lower) for _ in range(num))
+
+
+def add_pod_suffix(*, pod_name, rand_len=8, max_len=80):
+    """Add random string to pod name while staying under max len"""
+    suffix = "-" + rand_str(rand_len)
+    return pod_name[: max_len - len(suffix)].strip("-.") + suffix
+
+
+def create_pod_id(
+    dag_id: str | None = None,
+    task_id: str | None = None,
+    *,
+    max_length: int = 80,
+    unique: bool = True,
+) -> str:
     """
-    Generates the kubernetes safe pod_id. Note that this is
-    NOT the full ID that will be launched to k8s. We will add a uuid
-    to ensure uniqueness.
+    Generates unique pod ID given a dag_id and / or task_id.
 
     :param dag_id: DAG ID
     :param task_id: Task ID
-    :return: The non-unique pod_id for this task/DAG pairing
+    :param max_length: max number of characters
+    :param unique: whether a random string suffix should be added
+    :return: A valid identifier for a kubernetes pod name
     """
+    if not (dag_id or task_id):
+        raise ValueError("Must supply either dag_id or task_id.")
     name = ""
     if dag_id:
         name += dag_id
@@ -43,7 +69,11 @@ def create_pod_id(dag_id: str | None = None, task_id: str | None = None) -> str:
         if name:
             name += "-"
         name += task_id
-    return slugify(name, lowercase=True)[:253].strip("-.")
+    base_name = slugify(name, lowercase=True)[:max_length].strip(".-")
+    if unique:
+        return add_pod_suffix(pod_name=base_name, rand_len=8, max_len=max_length)
+    else:
+        return base_name
 
 
 def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey:
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index 4382b390aa..7af1632559 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -28,7 +28,6 @@ import hashlib
 import logging
 import os
 import re
-import uuid
 import warnings
 from functools import reduce
 
@@ -37,6 +36,7 @@ from kubernetes.client import models as k8s
 from kubernetes.client.api_client import ApiClient
 
 from airflow.exceptions import AirflowConfigException, PodReconciliationError, RemovedInAirflow3Warning
+from airflow.kubernetes.kubernetes_helper_functions import add_pod_suffix, rand_str
 from airflow.kubernetes.pod_generator_deprecated import PodDefaults, PodGenerator as PodGeneratorDeprecated
 from airflow.utils import yaml
 from airflow.version import version as airflow_version
@@ -125,9 +125,10 @@ class PodGenerator:
 
     def gen_pod(self) -> k8s.V1Pod:
         """Generates pod"""
+        warnings.warn("This function is deprecated. ", RemovedInAirflow3Warning)
         result = self.ud_pod
 
-        result.metadata.name = self.make_unique_pod_id(result.metadata.name)
+        result.metadata.name = add_pod_suffix(pod_name=result.metadata.name)
 
         if self.extract_xcom:
             result = self.add_xcom_sidecar(result)
@@ -340,6 +341,11 @@ class PodGenerator:
             - executor_config
             - dynamic arguments
         """
+        if len(pod_id) > 253:
+            warnings.warn(
+                "pod_id supplied is longer than 253 characters; truncating and adding unique suffix."
+            )
+            pod_id = add_pod_suffix(pod_name=pod_id, max_len=253)
         try:
             image = pod_override_object.spec.containers[0].image  # type: ignore
             if not image:
@@ -374,7 +380,7 @@ class PodGenerator:
             metadata=k8s.V1ObjectMeta(
                 namespace=namespace,
                 annotations=annotations,
-                name=PodGenerator.make_unique_pod_id(pod_id),
+                name=pod_id,
                 labels=labels,
             ),
             spec=k8s.V1PodSpec(
@@ -445,7 +451,7 @@ class PodGenerator:
         r"""
         Kubernetes pod names must consist of one or more lowercase
         rfc1035/rfc1123 labels separated by '.' with a maximum length of 253
-        characters. Each label has a maximum length of 63 characters.
+        characters.
 
         Name must pass the following regex for validation
         ``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$``
@@ -453,21 +459,22 @@ class PodGenerator:
         For more details, see:
         https://github.com/kubernetes/kubernetes/blob/release-1.1/docs/design/identifiers.md
 
-        :param pod_id: a dag_id with only alphanumeric characters
+        :param pod_id: requested pod name
         :return: ``str`` valid Pod name of appropriate length
         """
+        warnings.warn(
+            "This function is deprecated. Use `add_pod_suffix` in `kubernetes_helper_functions`.",
+            RemovedInAirflow3Warning,
+        )
+
         if not pod_id:
             return None
 
-        safe_uuid = uuid.uuid4().hex  # safe uuid will always be less than 63 chars
-
-        # Get prefix length after subtracting the uuid length. Clean up '.' and '-' from
-        # end of podID ('.' can't be followed by '-').
-        label_prefix_length = MAX_LABEL_LEN - len(safe_uuid) - 1  # -1 for separator
-        trimmed_pod_id = pod_id[:label_prefix_length].rstrip("-.")
-
-        # previously used a '.' as the separator, but this could create errors in some situations
-        return f"{trimmed_pod_id}-{safe_uuid}"
+        max_pod_id_len = 100  # arbitrarily chosen
+        suffix = rand_str(8)  # 8 seems good enough
+        base_pod_id_len = max_pod_id_len - len(suffix) - 1  # -1 for separator
+        trimmed_pod_id = pod_id[:base_pod_id_len].rstrip("-.")
+        return f"{trimmed_pod_id}-{suffix}"
 
 
 def merge_objects(base_obj, client_obj):
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 990015b625..6aafce2f12 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -20,11 +20,14 @@ from __future__ import annotations
 import json
 import logging
 import re
+import secrets
+import string
 import warnings
 from contextlib import AbstractContextManager
 from typing import TYPE_CHECKING, Any, Sequence
 
 from kubernetes.client import CoreV1Api, models as k8s
+from slugify import slugify
 
 from airflow.compat.functools import cached_property
 from airflow.exceptions import AirflowException
@@ -61,30 +64,62 @@ if TYPE_CHECKING:
 
     from airflow.utils.context import Context
 
+alphanum_lower = string.ascii_lowercase + string.digits
 
-def _task_id_to_pod_name(val: str) -> str:
+
+def _rand_str(num):
+    """Generate random lowercase alphanumeric string of length num.
+
+    TODO: when min airflow version >= 2.5, delete this function and import from kubernetes_helper_functions.
+
+    :meta private:
+    """
+    return "".join(secrets.choice(alphanum_lower) for _ in range(num))
+
+
+def _add_pod_suffix(*, pod_name, rand_len=8, max_len=253):
+    """Add random string to pod name while staying under max len
+
+    TODO: when min airflow version >= 2.5, delete this function and import from kubernetes_helper_functions.
+
+    :meta private:
     """
-    Given a task_id, convert it to a pod name.
-    Adds a 0 if start or end char is invalid.
-    Replaces any other invalid char with `-`.
+    suffix = "-" + _rand_str(rand_len)
+    return pod_name[: max_len - len(suffix)].strip("-.") + suffix
 
-    :param val: non-empty string, presumed to be a task id
-    :return valid kubernetes object name.
+
+def _create_pod_id(
+    dag_id: str | None = None,
+    task_id: str | None = None,
+    *,
+    max_length: int = 80,
+    unique: bool = True,
+) -> str:
     """
-    if not val:
-        raise ValueError("_task_id_to_pod_name requires non-empty string.")
-    val = val.lower()
-    if not re.match(r"[a-z0-9]", val[0]):
-        val = f"0{val}"
-    if not re.match(r"[a-z0-9]", val[-1]):
-        val = f"{val}0"
-    val = re.sub(r"[^a-z0-9\-.]", "-", val)
-    if len(val) > 253:
-        raise ValueError(
-            f"Pod name {val} is longer than 253 characters. "
-            "See https://kubernetes.io/docs/concepts/overview/working-with-objects/names/."
-        )
-    return val
+    Generates unique pod ID given a dag_id and / or task_id.
+
+    TODO: when min airflow version >= 2.5, delete this function and import from kubernetes_helper_functions.
+
+    :param dag_id: DAG ID
+    :param task_id: Task ID
+    :param max_length: max number of characters
+    :param unique: whether a random string suffix should be added
+    :return: A valid identifier for a kubernetes pod name
+    """
+    if not (dag_id or task_id):
+        raise ValueError("Must supply either dag_id or task_id.")
+    name = ""
+    if dag_id:
+        name += dag_id
+    if task_id:
+        if name:
+            name += "-"
+        name += task_id
+    base_name = slugify(name, lowercase=True)[:max_length].strip(".-")
+    if unique:
+        return _add_pod_suffix(pod_name=base_name, max_len=max_length)
+    else:
+        return base_name
 
 
 class PodReattachFailure(AirflowException):
@@ -198,7 +233,7 @@ class KubernetesPodOperator(BaseOperator):
         namespace: str | None = None,
         image: str | None = None,
         name: str | None = None,
-        random_name_suffix: bool | None = True,
+        random_name_suffix: bool = True,
         cmds: list[str] | None = None,
         arguments: list[str] | None = None,
         ports: list[k8s.V1ContainerPort] | None = None,
@@ -613,10 +648,10 @@ class KubernetesPodOperator(BaseOperator):
         pod = PodGenerator.reconcile_pods(pod_template, pod)
 
         if not pod.metadata.name:
-            pod.metadata.name = _task_id_to_pod_name(self.task_id)
-
-        if self.random_name_suffix:
-            pod.metadata.name = PodGenerator.make_unique_pod_id(pod.metadata.name)
+            pod.metadata.name = _create_pod_id(task_id=self.task_id, unique=self.random_name_suffix)
+        elif self.random_name_suffix:
+            # user has supplied pod name, we're just adding suffix
+            pod.metadata.name = _add_pod_suffix(pod_name=pod.metadata.name)
 
         if not pod.metadata.namespace:
             # todo: replace with call to `hook.get_namespace` in 6.0, when it doesn't default to `default`.
diff --git a/tests/kubernetes/models/test_secret.py b/tests/kubernetes/models/test_secret.py
index 2a072d0401..df7e4b7817 100644
--- a/tests/kubernetes/models/test_secret.py
+++ b/tests/kubernetes/models/test_secret.py
@@ -63,11 +63,14 @@ class TestSecret:
         )
 
     @mock.patch("uuid.uuid4")
-    def test_attach_to_pod(self, mock_uuid):
+    @mock.patch("airflow.kubernetes.pod_generator.rand_str")
+    def test_attach_to_pod(self, mock_rand_str, mock_uuid):
         static_uuid = uuid.UUID("cf4a56d2-8101-4217-b027-2af6216feb48")
         mock_uuid.return_value = static_uuid
+        rand_str = "abcd1234"
+        mock_rand_str.return_value = rand_str
         path = sys.path[0] + "/tests/kubernetes/pod_generator_base.yaml"
-        pod = PodGenerator(pod_template_file=path).gen_pod()
+        pod = PodGenerator(pod_template_file=path).ud_pod
         secrets = [
             # This should be a secretRef
             Secret("env", None, "secret_a"),
@@ -84,7 +87,7 @@ class TestSecret:
             "kind": "Pod",
             "metadata": {
                 "labels": {"app": "myapp"},
-                "name": "myapp-pod-cf4a56d281014217b0272af6216feb48",
+                "name": "myapp-pod",
                 "namespace": "default",
             },
             "spec": {
@@ -108,7 +111,6 @@ class TestSecret:
                         "ports": [{"containerPort": 1234, "name": "foo"}],
                         "resources": {"limits": {"memory": "200Mi"}, "requests": {"memory": "100Mi"}},
                         "volumeMounts": [
-                            {"mountPath": "/airflow/xcom", "name": "xcom"},
                             {
                                 "mountPath": "/etc/foo",
                                 "name": "secretvol" + str(static_uuid),
@@ -116,19 +118,11 @@ class TestSecret:
                             },
                         ],
                     },
-                    {
-                        "command": ["sh", "-c", 'trap "exit 0" INT; while true; do sleep 30; done;'],
-                        "image": "alpine",
-                        "name": "airflow-xcom-sidecar",
-                        "resources": {"requests": {"cpu": "1m"}},
-                        "volumeMounts": [{"mountPath": "/airflow/xcom", "name": "xcom"}],
-                    },
                 ],
                 "hostNetwork": True,
                 "imagePullSecrets": [{"name": "pull_secret_a"}, {"name": "pull_secret_b"}],
                 "securityContext": {"fsGroup": 2000, "runAsUser": 1000},
                 "volumes": [
-                    {"emptyDir": {}, "name": "xcom"},
                     {"name": "secretvol" + str(static_uuid), "secret": {"secretName": "secret_b"}},
                 ],
             },
diff --git a/tests/kubernetes/test_kubernetes_helper_functions.py b/tests/kubernetes/test_kubernetes_helper_functions.py
index b90125aeb8..43bdab47e1 100644
--- a/tests/kubernetes/test_kubernetes_helper_functions.py
+++ b/tests/kubernetes/test_kubernetes_helper_functions.py
@@ -20,67 +20,97 @@ from __future__ import annotations
 import re
 
 import pytest
+from pytest import param
 
 from airflow.kubernetes.kubernetes_helper_functions import create_pod_id
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import _create_pod_id
 
 pod_name_regex = r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
 
 
+# todo: when cncf provider min airflow version >= 2.5 remove this parameterization
+# we added this function to provider temporarily until min airflow version catches up
+# meanwhile, we use this one test to test both core and provider
 @pytest.mark.parametrize(
-    "val, expected",
-    [
-        ("task-id", "task-id"),  # no problem
-        ("task_id", "task-id"),  # underscores
-        ("---task.id---", "task-id"),  # dots
-        (".task.id", "task-id"),  # leading dot invalid
-        ("**task.id", "task-id"),  # leading dot invalid
-        ("-90Abc*&", "90abc"),  # invalid ends
-        ("90AçLbˆˆç˙ßߘ˜˙c*a", "90aclb-c-ssss-c-a"),  # weird unicode
-    ],
+    "create_pod_id", [param(_create_pod_id, id="provider"), param(create_pod_id, id="core")]
 )
-def test_create_pod_id_task_only(val, expected):
-    actual = create_pod_id(task_id=val)
-    assert actual == expected
-    assert re.match(pod_name_regex, actual)
+class TestCreatePodId:
+    @pytest.mark.parametrize(
+        "val, expected",
+        [
+            ("task-id", "task-id"),  # no problem
+            ("task_id", "task-id"),  # underscores
+            ("---task.id---", "task-id"),  # dots
+            (".task.id", "task-id"),  # leading dot invalid
+            ("**task.id", "task-id"),  # leading dot invalid
+            ("-90Abc*&", "90abc"),  # invalid ends
+            ("90AçLbˆˆç˙ßߘ˜˙c*a", "90aclb-c-ssss-c-a"),  # weird unicode
+        ],
+    )
+    def test_create_pod_id_task_only(self, val, expected, create_pod_id):
+        actual = create_pod_id(task_id=val, unique=False)
+        assert actual == expected
+        assert re.match(pod_name_regex, actual)
 
+    @pytest.mark.parametrize(
+        "val, expected",
+        [
+            ("dag-id", "dag-id"),  # no problem
+            ("dag_id", "dag-id"),  # underscores
+            ("---dag.id---", "dag-id"),  # dots
+            (".dag.id", "dag-id"),  # leading dot invalid
+            ("**dag.id", "dag-id"),  # leading dot invalid
+            ("-90Abc*&", "90abc"),  # invalid ends
+            ("90AçLbˆˆç˙ßߘ˜˙c*a", "90aclb-c-ssss-c-a"),  # weird unicode
+        ],
+    )
+    def test_create_pod_id_dag_only(self, val, expected, create_pod_id):
+        actual = create_pod_id(dag_id=val, unique=False)
+        assert actual == expected
+        assert re.match(pod_name_regex, actual)
 
-@pytest.mark.parametrize(
-    "val, expected",
-    [
-        ("dag-id", "dag-id"),  # no problem
-        ("dag_id", "dag-id"),  # underscores
-        ("---dag.id---", "dag-id"),  # dots
-        (".dag.id", "dag-id"),  # leading dot invalid
-        ("**dag.id", "dag-id"),  # leading dot invalid
-        ("-90Abc*&", "90abc"),  # invalid ends
-        ("90AçLbˆˆç˙ßߘ˜˙c*a", "90aclb-c-ssss-c-a"),  # weird unicode
-    ],
-)
-def test_create_pod_id_dag_only(val, expected):
-    actual = create_pod_id(dag_id=val)
-    assert actual == expected
-    assert re.match(pod_name_regex, actual)
-
+    @pytest.mark.parametrize(
+        "dag_id, task_id, expected",
+        [
+            ("dag-id", "task-id", "dag-id-task-id"),  # no problem
+            ("dag_id", "task_id", "dag-id-task-id"),  # underscores
+            ("dag.id", "task.id", "dag-id-task-id"),  # dots
+            (".dag.id", ".---task.id", "dag-id-task-id"),  # leading dot invalid
+            ("**dag.id", "**task.id", "dag-id-task-id"),  # leading dot invalid
+            ("-90Abc*&", "-90Abc*&", "90abc-90abc"),  # invalid ends
+            ("90AçLbˆˆç˙ßߘ˜˙c*a", "90AçLbˆˆç˙ßߘ˜˙c*a", "90aclb-c-ssss-c-a-90aclb-c-ssss-c-a"),  # ugly
+        ],
+    )
+    def test_create_pod_id_dag_and_task(self, dag_id, task_id, expected, create_pod_id):
+        actual = create_pod_id(dag_id=dag_id, task_id=task_id, unique=False)
+        assert actual == expected
+        assert re.match(pod_name_regex, actual)
 
-@pytest.mark.parametrize(
-    "dag_id, task_id, expected",
-    [
-        ("dag-id", "task-id", "dag-id-task-id"),  # no problem
-        ("dag_id", "task_id", "dag-id-task-id"),  # underscores
-        ("dag.id", "task.id", "dag-id-task-id"),  # dots
-        (".dag.id", ".---task.id", "dag-id-task-id"),  # leading dot invalid
-        ("**dag.id", "**task.id", "dag-id-task-id"),  # leading dot invalid
-        ("-90Abc*&", "-90Abc*&", "90abc-90abc"),  # invalid ends
-        ("90AçLbˆˆç˙ßߘ˜˙c*a", "90AçLbˆˆç˙ßߘ˜˙c*a", "90aclb-c-ssss-c-a-90aclb-c-ssss-c-a"),  # ugly
-    ],
-)
-def test_create_pod_id_dag_and_task(dag_id, task_id, expected):
-    actual = create_pod_id(dag_id=dag_id, task_id=task_id)
-    assert actual == expected
-    assert re.match(pod_name_regex, actual)
+    def test_create_pod_id_dag_too_long_with_suffix(self, create_pod_id):
+        actual = create_pod_id("0" * 254)
+        assert re.match(r"0{71}-[a-z0-9]{8}", actual)
+        assert re.match(pod_name_regex, actual)
 
+    def test_create_pod_id_dag_too_long_non_unique(self, create_pod_id):
+        actual = create_pod_id("0" * 254, unique=False)
+        assert re.match(r"0{80}", actual)
+        assert re.match(pod_name_regex, actual)
 
-def test_create_pod_id_dag_too_long():
-    actual = create_pod_id("0" * 254)
-    assert actual == "0" * 253
-    assert re.match(pod_name_regex, actual)
+    @pytest.mark.parametrize("unique", [True, False])
+    @pytest.mark.parametrize("length", [25, 100, 200, 300])
+    def test_create_pod_id(self, create_pod_id, length, unique):
+        """Test behavior of max_length and unique."""
+        dag_id = "dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-dag-"
+        task_id = "task-task-task-task-task-task-task-task-task-task-task-task-task-task-task-task-task-"
+        actual = create_pod_id(
+            dag_id=dag_id,
+            task_id=task_id,
+            max_length=length,
+            unique=unique,
+        )
+        base = f"{dag_id}{task_id}".strip("-")
+        if unique:
+            assert actual[:-9] == base[: length - 9].strip("-")
+            assert re.match(r"-[a-z0-9]{8}", actual[-9:])
+        else:
+            assert actual == base[:length]
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index bae79fd2a5..1314f763e8 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -19,14 +19,13 @@ from __future__ import annotations
 import os
 import re
 import sys
-import uuid
 from unittest import mock
 from unittest.mock import MagicMock
 
 import pytest
 from dateutil import parser
 from kubernetes.client import ApiClient, models as k8s
-from parameterized import parameterized
+from pytest import param
 
 from airflow import __version__
 from airflow.exceptions import AirflowConfigException, PodReconciliationError
@@ -42,7 +41,7 @@ from airflow.kubernetes.secret import Secret
 
 class TestPodGenerator:
     def setup_method(self):
-        self.static_uuid = uuid.UUID("cf4a56d2-8101-4217-b027-2af6216feb48")
+        self.rand_str = "abcd1234"
         self.deserialize_result = {
             "apiVersion": "v1",
             "kind": "Pod",
@@ -92,7 +91,7 @@ class TestPodGenerator:
         }
         self.metadata = {
             "labels": self.labels,
-            "name": "pod_id-" + self.static_uuid.hex,
+            "name": "pod_id-" + self.rand_str,
             "namespace": "namespace",
             "annotations": self.annotations,
         }
@@ -112,7 +111,7 @@ class TestPodGenerator:
             kind="Pod",
             metadata=k8s.V1ObjectMeta(
                 namespace="default",
-                name="myapp-pod-" + self.static_uuid.hex,
+                name="myapp-pod-" + self.rand_str,
                 labels={"app": "myapp"},
             ),
             spec=k8s.V1PodSpec(
@@ -160,14 +159,17 @@ class TestPodGenerator:
             ),
         )
 
-    @mock.patch("uuid.uuid4")
-    def test_gen_pod_extract_xcom(self, mock_uuid):
-        mock_uuid.return_value = self.static_uuid
+    @mock.patch("airflow.kubernetes.kubernetes_helper_functions.rand_str")
+    def test_gen_pod_extract_xcom(self, mock_rand_str):
+        """
+        Method gen_pod is used nowhere in codebase and is deprecated.
+        This test is only retained for backcompat.
+        """
+        mock_rand_str.return_value = self.rand_str
         path = sys.path[0] + "/tests/kubernetes/pod_generator_base_with_secrets.yaml"
 
         pod_generator = PodGenerator(pod_template_file=path, extract_xcom=True)
         result = pod_generator.gen_pod()
-        result_dict = self.k8s_client.sanitize_for_serialization(result)
         container_two = {
             "name": "airflow-xcom-sidecar",
             "image": "alpine",
@@ -189,7 +191,6 @@ class TestPodGenerator:
         )
         result_dict = self.k8s_client.sanitize_for_serialization(result)
         expected_dict = self.k8s_client.sanitize_for_serialization(self.expected)
-
         assert result_dict == expected_dict
 
     def test_from_obj(self):
@@ -322,18 +323,11 @@ class TestPodGenerator:
             },
         } == result
 
-    @mock.patch("uuid.uuid4")
-    def test_reconcile_pods_empty_mutator_pod(self, mock_uuid):
-        mock_uuid.return_value = self.static_uuid
+    def test_reconcile_pods_empty_mutator_pod(self):
         path = sys.path[0] + "/tests/kubernetes/pod_generator_base_with_secrets.yaml"
-
         pod_generator = PodGenerator(pod_template_file=path, extract_xcom=True)
-        base_pod = pod_generator.gen_pod()
+        base_pod = pod_generator.ud_pod
         mutator_pod = None
-        name = "name1-" + self.static_uuid.hex
-
-        base_pod.metadata.name = name
-
         result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
         assert base_pod == result
 
@@ -341,12 +335,12 @@ class TestPodGenerator:
         result = PodGenerator.reconcile_pods(base_pod, mutator_pod)
         assert base_pod == result
 
-    @mock.patch("uuid.uuid4")
-    def test_reconcile_pods(self, mock_uuid):
-        mock_uuid.return_value = self.static_uuid
+    @mock.patch("airflow.kubernetes.kubernetes_helper_functions.rand_str")
+    def test_reconcile_pods(self, mock_rand_str):
+        mock_rand_str.return_value = self.rand_str
         path = sys.path[0] + "/tests/kubernetes/pod_generator_base_with_secrets.yaml"
 
-        base_pod = PodGenerator(pod_template_file=path, extract_xcom=False).gen_pod()
+        base_pod = PodGenerator(pod_template_file=path, extract_xcom=False).ud_pod
 
         mutator_pod = k8s.V1Pod(
             metadata=k8s.V1ObjectMeta(
@@ -400,15 +394,13 @@ class TestPodGenerator:
     @pytest.mark.parametrize(
         "config_image, expected_image",
         [
-            pytest.param("my_image:my_tag", "my_image:my_tag", id="image_in_cfg"),
-            pytest.param(None, "busybox", id="no_image_in_cfg"),
+            param("my_image:my_tag", "my_image:my_tag", id="image_in_cfg"),
+            param(None, "busybox", id="no_image_in_cfg"),
         ],
     )
-    @mock.patch("uuid.uuid4")
-    def test_construct_pod(self, mock_uuid, config_image, expected_image):
+    def test_construct_pod(self, config_image, expected_image):
         template_file = sys.path[0] + "/tests/kubernetes/pod_generator_base_with_secrets.yaml"
         worker_config = PodGenerator.deserialize_model_file(template_file)
-        mock_uuid.return_value = self.static_uuid
         executor_config = k8s.V1Pod(
             spec=k8s.V1PodSpec(
                 containers=[
@@ -436,7 +428,7 @@ class TestPodGenerator:
         expected.metadata.labels = self.labels
         expected.metadata.labels["app"] = "myapp"
         expected.metadata.annotations = self.annotations
-        expected.metadata.name = "pod_id-" + self.static_uuid.hex
+        expected.metadata.name = "pod_id"
         expected.metadata.namespace = "test_namespace"
         expected.spec.containers[0].args = ["command"]
         expected.spec.containers[0].image = expected_image
@@ -452,12 +444,9 @@ class TestPodGenerator:
 
         assert expected_dict == result_dict
 
-    @mock.patch("uuid.uuid4")
-    def test_construct_pod_mapped_task(self, mock_uuid):
+    def test_construct_pod_mapped_task(self):
         template_file = sys.path[0] + "/tests/kubernetes/pod_generator_base.yaml"
         worker_config = PodGenerator.deserialize_model_file(template_file)
-        mock_uuid.return_value = self.static_uuid
-
         result = PodGenerator.construct_pod(
             dag_id=self.dag_id,
             task_id=self.task_id,
@@ -478,7 +467,7 @@ class TestPodGenerator:
         expected.metadata.labels["map_index"] = "0"
         expected.metadata.annotations = self.annotations
         expected.metadata.annotations["map_index"] = "0"
-        expected.metadata.name = "pod_id-" + self.static_uuid.hex
+        expected.metadata.name = "pod_id"
         expected.metadata.namespace = "test_namespace"
         expected.spec.containers[0].args = ["command"]
         del expected.spec.containers[0].env_from[1:]
@@ -489,11 +478,9 @@ class TestPodGenerator:
 
         assert expected_dict == result_dict
 
-    @mock.patch("uuid.uuid4")
-    def test_construct_pod_empty_executor_config(self, mock_uuid):
+    def test_construct_pod_empty_executor_config(self):
         path = sys.path[0] + "/tests/kubernetes/pod_generator_base_with_secrets.yaml"
         worker_config = PodGenerator.deserialize_model_file(path)
-        mock_uuid.return_value = self.static_uuid
         executor_config = None
 
         result = PodGenerator.construct_pod(
@@ -515,23 +502,23 @@ class TestPodGenerator:
         worker_config.metadata.annotations = self.annotations
         worker_config.metadata.labels = self.labels
         worker_config.metadata.labels["app"] = "myapp"
-        worker_config.metadata.name = "pod_id-" + self.static_uuid.hex
+        worker_config.metadata.name = "pod_id"
         worker_config.metadata.namespace = "namespace"
         worker_config.spec.containers[0].env.append(
             k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value="True")
         )
         worker_config_result = self.k8s_client.sanitize_for_serialization(worker_config)
-        assert worker_config_result == sanitized_result
+        assert sanitized_result == worker_config_result
 
-    @mock.patch("uuid.uuid4")
-    def test_construct_pod_attribute_error(self, mock_uuid):
+    @mock.patch("airflow.kubernetes.kubernetes_helper_functions.rand_str")
+    def test_construct_pod_attribute_error(self, mock_rand_str):
         """
         After upgrading k8s library we might get attribute error.
         In this case it should raise PodReconciliationError
         """
         path = sys.path[0] + "/tests/kubernetes/pod_generator_base_with_secrets.yaml"
         worker_config = PodGenerator.deserialize_model_file(path)
-        mock_uuid.return_value = self.static_uuid
+        mock_rand_str.return_value = self.rand_str
         executor_config = MagicMock()
         executor_config.side_effect = AttributeError("error")
 
@@ -550,9 +537,9 @@ class TestPodGenerator:
                 scheduler_job_id="uuid",
             )
 
-    @mock.patch("uuid.uuid4")
-    def test_ensure_max_label_length(self, mock_uuid):
-        mock_uuid.return_value = self.static_uuid
+    @mock.patch("airflow.kubernetes.kubernetes_helper_functions.rand_str")
+    def test_ensure_max_identifier_length(self, mock_rand_str):
+        mock_rand_str.return_value = self.rand_str
         path = os.path.join(os.path.dirname(__file__), "pod_generator_base_with_secrets.yaml")
         worker_config = PodGenerator.deserialize_model_file(path)
 
@@ -570,7 +557,7 @@ class TestPodGenerator:
             base_worker_pod=worker_config,
         )
 
-        assert result.metadata.name == "a" * 30 + "-" + self.static_uuid.hex
+        assert result.metadata.name == "a" * 244 + "-" + self.rand_str
         for _, v in result.metadata.labels.items():
             assert len(v) <= 63
 
@@ -727,30 +714,33 @@ class TestPodGenerator:
         assert len(caplog.records) == 1
         assert "does not exist" in caplog.text
 
-    @parameterized.expand(
+    @pytest.mark.parametrize(
+        "input",
         (
-            ("max_label_length", "a" * 63),
-            ("max_subdomain_length", "a" * 253),
-            (
-                "tiny",
-                "aaa",
-            ),
-        )
+            param("a" * 70, id="max_label_length"),
+            param("a" * 253, id="max_subdomain_length"),
+            param("a" * 95, id="close to max"),
+            param("aaa", id="tiny"),
+        ),
     )
-    def test_pod_name_confirm_to_max_length(self, _, pod_id):
-        name = PodGenerator.make_unique_pod_id(pod_id)
-        assert len(name) <= 253
-        parts = name.split("-")
-
-        # 63 is the MAX_LABEL_LEN in pod_generator.py
-        # 33 is the length of uuid4 + 1 for the separating '-' (32 + 1)
-        # 30 is the max length of the prefix
-        # so 30 = 63 - (32 + 1)
-        assert len(parts[0]) <= 30
-        assert len(parts[1]) == 32
-
-    @parameterized.expand(
+    def test_pod_name_confirm_to_max_length(self, input):
+        actual = PodGenerator.make_unique_pod_id(input)
+        assert len(actual) <= 100
+        actual_base, actual_suffix = actual.rsplit("-", maxsplit=1)
+        # we limit pod id length to 100
+        # random suffix is 8 chars plus the '-' separator
+        # so actual pod id base should first 91 chars of requested pod id
+        assert actual_base == input[:91]
+        # suffix should always be 8, the random alphanum
+        assert re.match(r"^[a-z0-9]{8}$", actual_suffix)
+
+    @pytest.mark.parametrize(
+        "pod_id, expected_starts_with",
         (
+            (
+                "somewhat-long-pod-name-maybe-longer-than-previously-supported-with-hyphen-",
+                "somewhat-long-pod-name-maybe-longer-than-previously-supported-with-hyphen",
+            ),
             ("pod-name-with-hyphen-", "pod-name-with-hyphen"),
             ("pod-name-with-double-hyphen--", "pod-name-with-double-hyphen"),
             ("pod0-name", "pod0-name"),
@@ -758,17 +748,22 @@ class TestPodGenerator:
             ("pod-name-with-dot.", "pod-name-with-dot"),
             ("pod-name-with-double-dot..", "pod-name-with-double-dot"),
             ("pod-name-with-hyphen-dot-.", "pod-name-with-hyphen-dot"),
-        )
+        ),
     )
     def test_pod_name_is_valid(self, pod_id, expected_starts_with):
-        name = PodGenerator.make_unique_pod_id(pod_id)
-
+        """
+        `make_unique_pod_id` doesn't actually guarantee that the regex passes for any input.
+        But I guess this test verifies that an otherwise valid pod_id doesn't get _screwed up_.
+        """
+        actual = PodGenerator.make_unique_pod_id(pod_id)
+        assert len(actual) <= 253
+        assert actual == actual.lower(), "not lowercase"
+        # verify using official k8s regex
         regex = r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
-        assert (
-            len(name) <= 253 and all(ch.lower() == ch for ch in name) and re.match(regex, name)
-        ), "pod_id is invalid - fails allowed regex check"
-
-        assert name.rsplit("-", 1)[0] == expected_starts_with
+        assert re.match(regex, actual), "pod_id is invalid - fails allowed regex check"
+        assert actual.rsplit("-", 1)[0] == expected_starts_with
+        # verify ends with 8 char lowercase alphanum string
+        assert re.match(rf"^{expected_starts_with}-[a-z0-9]{{8}}$", actual), "doesn't match expected pattern"
 
     def test_validate_pod_generator(self):
         with pytest.raises(AirflowConfigException):
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index f202285504..844803947a 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -33,7 +33,6 @@ from airflow.models.xcom import XCom
 from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
     KubernetesPodOperator,
     _optionally_suppress,
-    _task_id_to_pod_name,
 )
 from airflow.utils import timezone
 from airflow.utils.session import create_session
@@ -992,7 +991,7 @@ class TestKubernetesPodOperator:
             random_name_suffix=False,
         )
         pod = k.build_pod_request_obj({})
-        assert pod.metadata.name == "0.hi.--09hi"
+        assert pod.metadata.name == "hi-09hi"
 
     def test_task_id_as_name_with_suffix(self):
         k = KubernetesPodOperator(
@@ -1000,9 +999,9 @@ class TestKubernetesPodOperator:
             random_name_suffix=True,
         )
         pod = k.build_pod_request_obj({})
-        expected = "0.hi.--09hi"
-        assert pod.metadata.name.startswith(expected)
-        assert re.match(rf"{expected}-[a-z0-9-]+", pod.metadata.name) is not None
+        expected = "hi-09hi"
+        assert pod.metadata.name[: len(expected)] == expected
+        assert re.match(rf"{expected}-[a-z0-9]{{8}}", pod.metadata.name) is not None
 
     def test_task_id_as_name_with_suffix_very_long(self):
         k = KubernetesPodOperator(
@@ -1010,7 +1009,13 @@ class TestKubernetesPodOperator:
             random_name_suffix=True,
         )
         pod = k.build_pod_request_obj({})
-        assert re.match(r"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-[a-z0-9-]+", pod.metadata.name) is not None
+        assert (
+            re.match(
+                r"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-[a-z0-9]{8}",
+                pod.metadata.name,
+            )
+            is not None
+        )
 
     def test_task_id_as_name_dag_id_is_ignored(self):
         dag = DAG(dag_id="this_is_a_dag_name", start_date=pendulum.now())
@@ -1078,23 +1083,3 @@ class TestSuppress:
         with _optionally_suppress():
             print("hi")
         assert caplog.text == ""
-
-
-@pytest.mark.parametrize(
-    "val, expected",
-    [
-        ("task-id", "task-id"),  # no problem
-        ("task_id", "task-id"),  # underscores
-        ("task.id", "task.id"),  # dots ok
-        (".task.id", "0.task.id"),  # leading dot invalid
-        ("-90Abc*&", "0-90abc--0"),  # invalid ends
-        ("90AçLbˆˆç˙ßߘ˜˙c*a", "90a-lb---------c-a"),  # weird unicode
-    ],
-)
-def test_task_id_to_pod_name(val, expected):
-    assert _task_id_to_pod_name(val) == expected
-
-
-def test_task_id_to_pod_name_long():
-    with pytest.raises(ValueError, match="longer than 253"):
-        _task_id_to_pod_name("0" * 254)