You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/29 14:51:58 UTC

[airflow] branch master updated: Fix invalid value error caused by long k8s pod name (#13299)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 862443f  Fix invalid value error caused by long k8s pod name (#13299)
862443f is described below

commit 862443f6d3669411abfb83082c29c2fad7fcf12d
Author: QP Hou <qp...@scribd.com>
AuthorDate: Fri Jan 29 06:51:29 2021 -0800

    Fix invalid value error caused by long k8s pod name (#13299)
    
    K8S pod names follows DNS_SUBDOMAIN naming convention, which can be
    broken down into one or more DNS_LABEL separated by `.`.
    
    While the max length of pod name (DNS_SUBDOMAIN) is 253, each label
    component (DNS_LABEL) of a the name cannot be longer than 63. Pod names
    generated by k8s executor right now only contains one label, which means
    the total effective name length cannot be greater than 63.
    
    This patch concats uuid to pod_id using `.` to generate the pod anem,
    thus extending the max name length to 63 + len(uuid).
    
    Reference: https://github.com/kubernetes/kubernetes/blob/release-1.1/docs/design/identifiers.md
    Relevant discussion: https://github.com/kubernetes/kubernetes/issues/79351#issuecomment-505228196
---
 airflow/executors/kubernetes_executor.py | 38 +++++++++--------------
 airflow/kubernetes/pod_generator.py      | 27 +++++++++++------
 tests/kubernetes/models/test_secret.py   |  2 +-
 tests/kubernetes/test_pod_generator.py   | 52 ++++++++++++++++++++++++++++++--
 4 files changed, 83 insertions(+), 36 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 36bfdd1..88e26be 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -42,7 +42,7 @@ from airflow.kubernetes import pod_generator
 from airflow.kubernetes.kube_client import get_kube_client
 from airflow.kubernetes.kube_config import KubeConfig
 from airflow.kubernetes.kubernetes_helper_functions import create_pod_id
-from airflow.kubernetes.pod_generator import MAX_POD_ID_LEN, PodGenerator
+from airflow.kubernetes.pod_generator import PodGenerator
 from airflow.kubernetes.pod_launcher import PodLauncher
 from airflow.models import TaskInstance
 from airflow.models.taskinstance import TaskInstanceKey
@@ -367,24 +367,6 @@ class AirflowKubernetesScheduler(LoggingMixin):
 
         return TaskInstanceKey(dag_id, task_id, execution_date, try_number)
 
-    @staticmethod
-    def _make_safe_pod_id(safe_dag_id: str, safe_task_id: str, safe_uuid: str) -> str:
-        r"""
-        Kubernetes pod names must be <= 253 chars and must pass the following regex for
-        validation
-        ``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$``
-
-        :param safe_dag_id: a dag_id with only alphanumeric characters
-        :param safe_task_id: a task_id with only alphanumeric characters
-        :param safe_uuid: a uuid
-        :return: ``str`` valid Pod name of appropriate length
-        """
-        safe_key = safe_dag_id + safe_task_id
-
-        safe_pod_id = safe_key[: MAX_POD_ID_LEN - len(safe_uuid) - 1] + "-" + safe_uuid
-
-        return safe_pod_id
-
     def _flush_watcher_queue(self) -> None:
         self.log.debug('Executor shutting down, watcher_queue approx. size=%d', self.watcher_queue.qsize())
         while True:
@@ -468,7 +450,7 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
                 pod_generator.make_safe_label_value(task.dag_id),
                 pod_generator.make_safe_label_value(task.task_id),
                 pod_generator.datetime_to_label_safe_datestring(task.execution_date),
-                self.scheduler_job_id,
+                pod_generator.make_safe_label_value(str(self.scheduler_job_id)),
             )
             # pylint: enable=protected-access
             kwargs = dict(label_selector=dict_string)
@@ -603,10 +585,16 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
         tis_to_flush = [ti for ti in tis if not ti.external_executor_id]
         scheduler_job_ids = [ti.external_executor_id for ti in tis]
         pod_ids = {
-            create_pod_id(dag_id=ti.dag_id, task_id=ti.task_id): ti for ti in tis if ti.external_executor_id
+            create_pod_id(
+                dag_id=pod_generator.make_safe_label_value(ti.dag_id),
+                task_id=pod_generator.make_safe_label_value(ti.task_id),
+            ): ti
+            for ti in tis
+            if ti.external_executor_id
         }
         kube_client: client.CoreV1Api = self.kube_client
         for scheduler_job_id in scheduler_job_ids:
+            scheduler_job_id = pod_generator.make_safe_label_value(str(scheduler_job_id))
             kwargs = {'label_selector': f'airflow-worker={scheduler_job_id}'}
             pod_list = kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, **kwargs)
             for pod in pod_list.items:
@@ -624,7 +612,9 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
         :param pod_ids: pod_ids we expect to patch.
         """
         self.log.info("attempting to adopt pod %s", pod.metadata.name)
-        pod.metadata.labels['airflow-worker'] = str(self.scheduler_job_id)
+        pod.metadata.labels['airflow-worker'] = pod_generator.make_safe_label_value(
+            str(self.scheduler_job_id)
+        )
         dag_id = pod.metadata.labels['dag_id']
         task_id = pod.metadata.labels['task_id']
         pod_id = create_pod_id(dag_id=dag_id, task_id=task_id)
@@ -659,7 +649,9 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
         pod_list = kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, **kwargs)
         for pod in pod_list.items:
             self.log.info("Attempting to adopt pod %s", pod.metadata.name)
-            pod.metadata.labels['airflow-worker'] = str(self.scheduler_job_id)
+            pod.metadata.labels['airflow-worker'] = pod_generator.make_safe_label_value(
+                str(self.scheduler_job_id)
+            )
             try:
                 kube_client.patch_namespaced_pod(
                     name=pod.metadata.name,
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index deee427..0782f1a 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -39,8 +39,6 @@ from airflow.exceptions import AirflowConfigException
 from airflow.kubernetes.pod_generator_deprecated import PodGenerator as PodGeneratorDeprecated
 from airflow.version import version as airflow_version
 
-MAX_POD_ID_LEN = 253
-
 MAX_LABEL_LEN = 63
 
 
@@ -355,7 +353,7 @@ class PodGenerator:
         pod_override_object: Optional[k8s.V1Pod],
         base_worker_pod: k8s.V1Pod,
         namespace: str,
-        scheduler_job_id: str,
+        scheduler_job_id: int,
     ) -> k8s.V1Pod:
         """
         Construct a pod by gathering and consolidating the configuration from 3 places:
@@ -370,6 +368,10 @@ class PodGenerator:
         except Exception:  # pylint: disable=W0703
             image = kube_image
 
+        task_id = make_safe_label_value(task_id)
+        dag_id = make_safe_label_value(dag_id)
+        scheduler_job_id = make_safe_label_value(str(scheduler_job_id))
+
         dynamic_pod = k8s.V1Pod(
             metadata=k8s.V1ObjectMeta(
                 namespace=namespace,
@@ -381,7 +383,7 @@ class PodGenerator:
                 },
                 name=PodGenerator.make_unique_pod_id(pod_id),
                 labels={
-                    'airflow-worker': str(scheduler_job_id),
+                    'airflow-worker': scheduler_job_id,
                     'dag_id': dag_id,
                     'task_id': task_id,
                     'execution_date': datetime_to_label_safe_datestring(date),
@@ -450,20 +452,27 @@ class PodGenerator:
         return api_client._ApiClient__deserialize_model(pod_dict, k8s.V1Pod)  # pylint: disable=W0212
 
     @staticmethod
-    def make_unique_pod_id(pod_id):
+    def make_unique_pod_id(pod_id: str) -> str:
         r"""
-        Kubernetes pod names must be <= 253 chars and must pass the following regex for
-        validation
+        Kubernetes pod names must consist of one or more lowercase
+        rfc1035/rfc1123 labels separated by '.' with a maximum length of 253
+        characters. Each label has a maximum length of 63 characters.
+
+        Name must pass the following regex for validation
         ``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$``
 
+        For more details, see:
+        https://github.com/kubernetes/kubernetes/blob/release-1.1/docs/design/identifiers.md
+
         :param pod_id: a dag_id with only alphanumeric characters
         :return: ``str`` valid Pod name of appropriate length
         """
         if not pod_id:
             return None
 
-        safe_uuid = uuid.uuid4().hex
-        safe_pod_id = pod_id[: MAX_POD_ID_LEN - len(safe_uuid) - 1] + "-" + safe_uuid
+        safe_uuid = uuid.uuid4().hex  # safe uuid will always be less than 63 chars
+        trimmed_pod_id = pod_id[:MAX_LABEL_LEN]
+        safe_pod_id = f"{trimmed_pod_id}.{safe_uuid}"
 
         return safe_pod_id
 
diff --git a/tests/kubernetes/models/test_secret.py b/tests/kubernetes/models/test_secret.py
index c4f0ed8..3e9e609 100644
--- a/tests/kubernetes/models/test_secret.py
+++ b/tests/kubernetes/models/test_secret.py
@@ -83,7 +83,7 @@ class TestSecret(unittest.TestCase):
             'kind': 'Pod',
             'metadata': {
                 'labels': {'app': 'myapp'},
-                'name': 'myapp-pod-cf4a56d281014217b0272af6216feb48',
+                'name': 'myapp-pod.cf4a56d281014217b0272af6216feb48',
                 'namespace': 'default',
             },
             'spec': {
diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py
index 0c2efe5..17a942a 100644
--- a/tests/kubernetes/test_pod_generator.py
+++ b/tests/kubernetes/test_pod_generator.py
@@ -14,6 +14,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import os
 import sys
 import unittest
 import uuid
@@ -22,6 +23,7 @@ from unittest import mock
 import pytest
 from dateutil import parser
 from kubernetes.client import ApiClient, models as k8s
+from parameterized import parameterized
 
 from airflow import __version__
 from airflow.exceptions import AirflowConfigException
@@ -107,7 +109,7 @@ class TestPodGenerator(unittest.TestCase):
             kind="Pod",
             metadata=k8s.V1ObjectMeta(
                 namespace="default",
-                name='myapp-pod-' + self.static_uuid.hex,
+                name='myapp-pod.' + self.static_uuid.hex,
                 labels={'app': 'myapp'},
             ),
             spec=k8s.V1PodSpec(
@@ -424,7 +426,7 @@ class TestPodGenerator(unittest.TestCase):
         expected.metadata.labels = self.labels
         expected.metadata.labels['app'] = 'myapp'
         expected.metadata.annotations = self.annotations
-        expected.metadata.name = 'pod_id-' + self.static_uuid.hex
+        expected.metadata.name = 'pod_id.' + self.static_uuid.hex
         expected.metadata.namespace = 'test_namespace'
         expected.spec.containers[0].args = ['command']
         expected.spec.containers[0].image = 'airflow_image'
@@ -466,7 +468,7 @@ class TestPodGenerator(unittest.TestCase):
         worker_config.metadata.annotations = self.annotations
         worker_config.metadata.labels = self.labels
         worker_config.metadata.labels['app'] = 'myapp'
-        worker_config.metadata.name = 'pod_id-' + self.static_uuid.hex
+        worker_config.metadata.name = 'pod_id.' + self.static_uuid.hex
         worker_config.metadata.namespace = 'namespace'
         worker_config.spec.containers[0].env.append(
             k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value='True')
@@ -474,6 +476,30 @@ class TestPodGenerator(unittest.TestCase):
         worker_config_result = self.k8s_client.sanitize_for_serialization(worker_config)
         assert worker_config_result == sanitized_result
 
+    @mock.patch('uuid.uuid4')
+    def test_ensure_max_label_length(self, mock_uuid):
+        mock_uuid.return_value = self.static_uuid
+        path = os.path.join(os.path.dirname(__file__), 'pod_generator_base_with_secrets.yaml')
+        worker_config = PodGenerator.deserialize_model_file(path)
+
+        result = PodGenerator.construct_pod(
+            dag_id='a' * 512,
+            task_id='a' * 512,
+            pod_id='a' * 512,
+            kube_image='a' * 512,
+            try_number=3,
+            date=self.execution_date,
+            args=['command'],
+            namespace='namespace',
+            scheduler_job_id='a' * 512,
+            pod_override_object=None,
+            base_worker_pod=worker_config,
+        )
+
+        assert result.metadata.name == 'a' * 63 + '.' + self.static_uuid.hex
+        for _, v in result.metadata.labels.items():
+            assert len(v) <= 63
+
     def test_merge_objects_empty(self):
         annotations = {'foo1': 'bar1'}
         base_obj = k8s.V1ObjectMeta(annotations=annotations)
@@ -607,6 +633,26 @@ class TestPodGenerator(unittest.TestCase):
         sanitized_res = self.k8s_client.sanitize_for_serialization(result)
         assert sanitized_res == self.deserialize_result
 
+    @parameterized.expand(
+        (
+            ("max_label_length", "a" * 63),
+            ("max_subdomain_length", "a" * 253),
+            (
+                "tiny",
+                "aaa",
+            ),
+        )
+    )
+    def test_pod_name_confirm_to_max_length(self, _, pod_id):
+        name = PodGenerator.make_unique_pod_id(pod_id)
+        assert len(name) <= 253
+        parts = name.split(".")
+        if len(pod_id) <= 63:
+            assert len(parts[0]) == len(pod_id)
+        else:
+            assert len(parts[0]) <= 63
+        assert len(parts[1]) <= 63
+
     def test_deserialize_model_string(self):
         fixture = """
 apiVersion: v1