You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2022/10/21 15:14:49 UTC

[airflow] branch main updated: Make pod name optional in KubernetesPodOperator (#27120)

This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c9e57687b0 Make pod name optional in KubernetesPodOperator (#27120)
c9e57687b0 is described below

commit c9e57687b03807a36fac1c2c03ccf8ebb2e802b9
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Fri Oct 21 08:14:40 2022 -0700

    Make pod name optional in KubernetesPodOperator (#27120)
    
    When not provided, let's just use the task name.  Pod name is not of great consequence because it's randomized anyway.  So unless the user also disables randomization, it shouldn't need extra details like dag_id, run_id, map_index.
---
 airflow/providers/cncf/kubernetes/CHANGELOG.rst    | 13 +++++++----
 .../cncf/kubernetes/operators/kubernetes_pod.py    | 19 +++++++--------
 .../kubernetes/operators/test_kubernetes_pod.py    | 27 ++++++++++++----------
 3 files changed, 34 insertions(+), 25 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
index 5a6de666f8..179f4cf071 100644
--- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst
+++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
@@ -32,6 +32,11 @@ Breaking changes
 
 Previously KubernetesPodOperator considered some settings from the Airflow config's ``kubernetes`` section.  Such consideration was deprecated in 4.1.0 and is now removed.  If you previously relied on the Airflow config, and you want client generation to have non-default configuration, you will need to define your configuration in an Airflow connection and set KPO to use the connection.  See kubernetes provider documentation on defining a kubernetes Airflow connection for details.
 
+Features
+~~~~~~~~
+
+Previously, ``name`` was a required argument for KubernetesPodOperator (when also not supplying pod template or full pod spec). Now, if ``name`` is not supplied, ``task_id`` will be used.
+
 4.4.0
 .....
 
@@ -225,7 +230,7 @@ Features
 ~~~~~~~~
 
 * ``Add map_index label to mapped KubernetesPodOperator (#21916)``
-* ``Change KubePodOperator labels from exeuction_date to run_id (#21960)``
+* ``Change KubernetesPodOperator labels from execution_date to run_id (#21960)``
 
 Misc
 ~~~~
@@ -490,7 +495,7 @@ Breaking changes
 Features
 ~~~~~~~~
 
-* ``Add 'KubernetesPodOperat' 'pod-template-file' jinja template support (#15942)``
+* ``Add 'KubernetesPodOperator' 'pod-template-file' jinja template support (#15942)``
 * ``Save pod name to xcom for KubernetesPodOperator (#15755)``
 
 Bug Fixes
@@ -499,7 +504,7 @@ Bug Fixes
 * ``Bug Fix Pod-Template Affinity Ignored due to empty Affinity K8S Object (#15787)``
 * ``Bug Pod Template File Values Ignored (#16095)``
 * ``Fix issue with parsing error logs in the KPO (#15638)``
-* ``Fix unsuccessful KubernetesPod final_state call when 'is_delete_operator_pod=True' (#15490)``
+* ``Fix unsuccessful KubernetesPodOperator final_state call when 'is_delete_operator_pod=True' (#15490)``
 
 .. Below changes are excluded from the changelog. Move them to
    appropriate section above if needed. Do not delete the lines(!):
@@ -521,7 +526,7 @@ Bug Fixes
 ~~~~~~~~~
 
 * ``Fix timeout when using XCom with KubernetesPodOperator (#15388)``
-* ``Fix labels on the pod created by ''KubernetsPodOperator'' (#15492)``
+* ``Fix labels on the pod created by ''KubernetesPodOperator'' (#15492)``
 
 1.1.0
 .....
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index e0043816a9..892f17e138 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -484,14 +484,12 @@ class KubernetesPodOperator(BaseOperator):
         labels_value += ',!airflow-worker'
         return labels_value
 
-    def _set_name(self, name: str | None) -> str | None:
-        if name is None:
-            if self.pod_template_file or self.full_pod_spec:
-                return None
-            raise AirflowException("`name` is required unless `pod_template_file` or `full_pod_spec` is set")
-
-        validate_key(name, max_length=220)
-        return re.sub(r'[^a-z0-9-]+', '-', name.lower())
+    @staticmethod
+    def _set_name(name: str | None) -> str | None:
+        if name is not None:
+            validate_key(name, max_length=220)
+            return re.sub(r'[^a-z0-9-]+', '-', name.lower())
+        return None
 
     def patch_already_checked(self, pod: k8s.V1Pod):
         """Add an "already checked" annotation to ensure we don't reattach on retries"""
@@ -526,7 +524,7 @@ class KubernetesPodOperator(BaseOperator):
         elif self.full_pod_spec:
             pod_template = self.full_pod_spec
         else:
-            pod_template = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="name"))
+            pod_template = k8s.V1Pod(metadata=k8s.V1ObjectMeta())
 
         pod = k8s.V1Pod(
             api_version="v1",
@@ -571,6 +569,9 @@ class KubernetesPodOperator(BaseOperator):
 
         pod = PodGenerator.reconcile_pods(pod_template, pod)
 
+        if not pod.metadata.name:
+            pod.metadata.name = self.task_id
+
         if self.random_name_suffix:
             pod.metadata.name = PodGenerator.make_unique_pod_id(pod.metadata.name)
 
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index fbcd027aaf..3c93933fb4 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -16,8 +16,9 @@
 # under the License.
 from __future__ import annotations
 
+import re
 from unittest import mock
-from unittest.mock import MagicMock
+from unittest.mock import MagicMock, patch
 
 import pendulum
 import pytest
@@ -292,6 +293,19 @@ class TestKubernetesPodOperator:
         pod = k.build_pod_request_obj(create_context(k))
         assert pod.spec.image_pull_secrets == [k8s.V1LocalObjectReference(name=fake_pull_secrets)]
 
+    @patch(f"{KPO_MODULE}.KubernetesPodOperator.find_pod")
+    def test_omitted_name(self, mock_find):
+        k = KubernetesPodOperator(
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo 10"],
+            task_id="this-task-name",
+        )
+
+        context = create_context(k)
+        pod = k.build_pod_request_obj(context)
+        assert re.match('this-task-name-[a-z0-9]+', pod.metadata.name) is not None
+
     def test_image_pull_policy_correctly_set(self):
         k = KubernetesPodOperator(
             namespace="default",
@@ -378,17 +392,6 @@ class TestKubernetesPodOperator:
         else:
             assert pod.metadata.name == name_base
 
-    def test_pod_name_required(self):
-        with pytest.raises(AirflowException, match="`name` is required"):
-            KubernetesPodOperator(
-                namespace="default",
-                image="ubuntu:16.04",
-                task_id="task",
-                in_cluster=False,
-                do_xcom_push=False,
-                cluster_context="default",
-            )
-
     @pytest.fixture
     def pod_spec(self):
         return k8s.V1Pod(