You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2022/10/22 19:27:46 UTC

[airflow] branch main updated: Make namespace optional for KPO (#27116)

This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ecb8dd025 Make namespace optional for KPO (#27116)
3ecb8dd025 is described below

commit 3ecb8dd0259abfce37513509e8f67b9ede72af21
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Sat Oct 22 12:27:38 2022 -0700

    Make namespace optional for KPO (#27116)
    
    In KPO, currently, if you do not provide namespace as a KPO arg (and it's not otherwise specified through pod template or full pod spec) the task will fail when trying to create the pod, because the kube client does not fill it in for you like e.g. kubectl does.
    
    This PR makes namespace optional.
    
    If it's not specified through KPO arg, or full pod spec, or pod template, then first we'll check the hook to see if you've configured a namespace there. And if that is unspecified, if we're in a cluster already, we'll check /var/run/secrets/kubernetes.io/serviceaccount/namespace for the value. Finally, we'll use 'default'.
---
 airflow/providers/cncf/kubernetes/CHANGELOG.rst    |   4 +-
 .../cncf/kubernetes/operators/kubernetes_pod.py    |  14 +-
 .../operators.rst                                  |  16 +-
 .../kubernetes/operators/test_kubernetes_pod.py    | 486 ++++++++++-----------
 4 files changed, 259 insertions(+), 261 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
index 179f4cf071..6a0c5edb29 100644
--- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst
+++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
@@ -35,7 +35,9 @@ Previously KubernetesPodOperator considered some settings from the Airflow confi
 Features
 ~~~~~~~~
 
-Previously, ``name`` was a required argument for KubernetesPodOperator (when also not supplying pod template or full pod spec). Now, if ``name`` is not supplied, ``task_id`` will be used.
+* Previously, ``name`` was a required argument for KubernetesPodOperator (when also not supplying pod template or full pod spec). Now, if ``name`` is not supplied, ``task_id`` will be used.
+* KubernetsPodOperator argument ``namespace`` is now optional.  If not supplied via KPO param or pod template file or full pod spec, then we'll check the airflow conn,
+  then if in a k8s pod, try to infer the namespace from the container, then finally will use the ``default`` namespace.
 
 4.4.0
 .....
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 892f17e138..f37fb64698 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -287,6 +287,13 @@ class KubernetesPodOperator(BaseOperator):
         self.pod_request_obj: k8s.V1Pod | None = None
         self.pod: k8s.V1Pod | None = None
 
+    @cached_property
+    def _incluster_namespace(self):
+        from pathlib import Path
+
+        path = Path('/var/run/secrets/kubernetes.io/serviceaccount/namespace')
+        return path.exists() and path.read_text() or None
+
     def _render_nested_template_fields(
         self,
         content: Any,
@@ -575,6 +582,11 @@ class KubernetesPodOperator(BaseOperator):
         if self.random_name_suffix:
             pod.metadata.name = PodGenerator.make_unique_pod_id(pod.metadata.name)
 
+        if not pod.metadata.namespace:
+            hook_namespace = self.hook.conn_extras.get('extra__kubernetes__namespace')
+            pod_namespace = self.namespace or hook_namespace or self._incluster_namespace or 'default'
+            pod.metadata.namespace = pod_namespace
+
         for secret in self.secrets:
             self.log.debug("Adding secret to task %s", self.task_id)
             pod = secret.attach_to_pod(pod)
@@ -583,7 +595,7 @@ class KubernetesPodOperator(BaseOperator):
             pod = xcom_sidecar.add_xcom_sidecar(pod)
 
         labels = self._get_ti_pod_labels(context)
-        self.log.info("Creating pod %s with labels: %s", pod.metadata.name, labels)
+        self.log.info("Building pod %s with labels: %s", pod.metadata.name, labels)
 
         # Merge Pod Identifying labels with labels passed to operator
         pod.metadata.labels.update(labels)
diff --git a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
index 1776ea2956..57966e7fdd 100644
--- a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
+++ b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
@@ -57,9 +57,7 @@ You can print out the Kubernetes manifest for the pod that would be created at r
 
 .. code-block:: python
 
-    from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
-        KubernetesPodOperator,
-    )
+    from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
 
     k = KubernetesPodOperator(
         name="hello-dry-run",
@@ -73,6 +71,18 @@ You can print out the Kubernetes manifest for the pod that would be created at r
 
     k.dry_run()
 
+Argument precedence
+^^^^^^^^^^^^^^^^^^^
+
+When building the pod object, there may be overlap between KPO params, pod spec, template and airflow connection.
+In general, the order of precedence is KPO argument > full pod spec > pod template file > airflow connection.
+
+For ``namespace``, if namespace is not provided via any of these methods, then we'll first try to
+get the current namespace (if the task is already running in kubernetes) and failing that we'll use
+the ``default`` namespace.
+
+For pod name, if not provided explicitly, we'll use the task_id. A random suffix is added by default so the pod
+name is not generally of great consequence.
 
 How to use cluster ConfigMaps, Secrets, and Volumes with Pod?
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index 3c93933fb4..3eeb98bc8f 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 import re
+from contextlib import nullcontext
 from unittest import mock
 from unittest.mock import MagicMock, patch
 
@@ -36,6 +37,7 @@ from tests.test_utils import db
 DEFAULT_DATE = timezone.datetime(2016, 1, 1, 1, 0, 0)
 KPO_MODULE = "airflow.providers.cncf.kubernetes.operators.kubernetes_pod"
 POD_MANAGER_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager"
+POD_MANAGER_MODULE = "airflow.providers.cncf.kubernetes.utils.pod_manager"
 HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesHook"
 
 
@@ -46,8 +48,12 @@ def clear_db():
     yield
 
 
-def create_context(task, persist_to_db=False):
-    dag = task.dag if task.has_dag() else DAG(dag_id="dag")
+def create_context(task, persist_to_db=False, map_index=None):
+    if task.has_dag():
+        dag = task.dag
+    else:
+        dag = DAG(dag_id="dag", start_date=pendulum.now())
+        dag.add_task(task)
     dag_run = DagRun(
         run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE),
         run_type=DagRunType.MANUAL,
@@ -55,6 +61,8 @@ def create_context(task, persist_to_db=False):
     )
     task_instance = TaskInstance(task=task, run_id=dag_run.run_id)
     task_instance.dag_run = dag_run
+    if map_index is not None:
+        task_instance.map_index = map_index
     if persist_to_db:
         with create_session() as session:
             session.add(DagModel(dag_id=dag.dag_id))
@@ -75,19 +83,19 @@ def create_context(task, persist_to_db=False):
 class TestKubernetesPodOperator:
     @pytest.fixture(autouse=True)
     def setup(self, dag_maker):
-        self.create_pod_patch = mock.patch(f"{POD_MANAGER_CLASS}.create_pod")
-        self.await_pod_patch = mock.patch(f"{POD_MANAGER_CLASS}.await_pod_start")
-        self.await_pod_completion_patch = mock.patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
-        self.hook_patch = mock.patch(HOOK_CLASS)
+        self.create_pod_patch = patch(f"{POD_MANAGER_CLASS}.create_pod")
+        self.await_pod_patch = patch(f"{POD_MANAGER_CLASS}.await_pod_start")
+        self.await_pod_completion_patch = patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
+        self._default_client_patch = patch(f"{HOOK_CLASS}._get_default_client")
         self.create_mock = self.create_pod_patch.start()
         self.await_start_mock = self.await_pod_patch.start()
         self.await_pod_mock = self.await_pod_completion_patch.start()
-        self.hook_mock = self.hook_patch.start()
+        self._default_client_mock = self._default_client_patch.start()
         self.dag_maker = dag_maker
 
         yield
 
-        mock.patch.stopall()
+        patch.stopall()
 
     def run_pod(self, operator: KubernetesPodOperator, map_index: int = -1) -> k8s.V1Pod:
         with self.dag_maker(dag_id='dag') as dag:
@@ -103,15 +111,14 @@ class TestKubernetesPodOperator:
         remote_pod_mock = MagicMock()
         remote_pod_mock.status.phase = 'Succeeded'
         self.await_pod_mock.return_value = remote_pod_mock
-        if not isinstance(self.hook_mock.return_value.is_in_cluster, bool):
-            self.hook_mock.return_value.is_in_cluster = True
         operator.execute(context=context)
         return self.await_start_mock.call_args[1]['pod']
 
     def sanitize_for_serialization(self, obj):
         return ApiClient().sanitize_for_serialization(obj)
 
-    def test_config_path(self):
+    @patch(HOOK_CLASS)
+    def test_config_path(self, hook_mock):
         file_path = "/tmp/fake_file"
         k = KubernetesPodOperator(
             namespace="default",
@@ -121,20 +128,18 @@ class TestKubernetesPodOperator:
             labels={"foo": "bar"},
             name="test",
             task_id="task",
-            in_cluster=False,
             do_xcom_push=False,
             config_file=file_path,
-            cluster_context="default",
         )
         remote_pod_mock = MagicMock()
         remote_pod_mock.status.phase = 'Succeeded'
         self.await_pod_mock.return_value = remote_pod_mock
         self.run_pod(k)
-        self.hook_mock.assert_called_once_with(
+        hook_mock.assert_called_once_with(
+            cluster_context=None,
             conn_id=None,
-            in_cluster=False,
-            cluster_context="default",
             config_file=file_path,
+            in_cluster=None,
         )
 
     def test_env_vars(self):
@@ -159,61 +164,36 @@ class TestKubernetesPodOperator:
             'runAsUser': 1245,
         }
         k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
             security_context=security_context,
-            labels={"foo": "bar"},
-            name="test",
             task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
         )
-        pod = self.run_pod(k)
+        pod = k.build_pod_request_obj(create_context(k))
         assert pod.spec.security_context == security_context
 
     def test_container_security_context(self):
         container_security_context = {'allowPrivilegeEscalation': False}
         k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
             container_security_context=container_security_context,
-            labels={"foo": "bar"},
-            name="test",
             task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
         )
-        pod = self.run_pod(k)
+        pod = k.build_pod_request_obj(create_context(k))
         assert pod.spec.containers[0].security_context == container_security_context
 
     def test_envs_from_configmaps(
         self,
     ):
-        configmap_name = "test-config-map"
-        env_from = [k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap_name))]
-        # WHEN
+        env_from = [k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-config-map'))]
         k = KubernetesPodOperator(
-            namespace='default',
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name="test",
             task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
             env_from=env_from,
         )
-        pod = self.run_pod(k)
+        pod = k.build_pod_request_obj(create_context(k))
         assert pod.spec.containers[0].env_from == env_from
 
     @pytest.mark.parametrize(("in_cluster",), ([True], [False]))
-    def test_labels(self, in_cluster):
-        self.hook_mock.return_value.is_in_cluster = in_cluster
+    @patch(HOOK_CLASS)
+    def test_labels(self, hook_mock, in_cluster):
+        hook_mock.return_value.is_in_cluster = in_cluster
         k = KubernetesPodOperator(
             namespace="default",
             image="ubuntu:16.04",
@@ -238,13 +218,10 @@ class TestKubernetesPodOperator:
 
     def test_labels_mapped(self):
         k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
             name="test",
             task_id="task",
         )
-        pod = self.run_pod(k, map_index=10)
+        pod = k.build_pod_request_obj(create_context(k, map_index=10))
         assert pod.metadata.labels == {
             "dag_id": "dag",
             "kubernetes_pod_operator": "True",
@@ -253,9 +230,10 @@ class TestKubernetesPodOperator:
             "airflow_version": mock.ANY,
             "run_id": "test",
             "map_index": "10",
-            "airflow_kpo_in_cluster": "True",
+            "airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
         }
 
+    @patch(HOOK_CLASS, new=MagicMock)
     def test_find_pod_labels(self):
         k = KubernetesPodOperator(
             namespace="default",
@@ -277,111 +255,224 @@ class TestKubernetesPodOperator:
     def test_image_pull_secrets_correctly_set(self):
         fake_pull_secrets = "fakeSecret"
         k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name="test",
             task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
             image_pull_secrets=[k8s.V1LocalObjectReference(fake_pull_secrets)],
-            cluster_context="default",
         )
 
         pod = k.build_pod_request_obj(create_context(k))
         assert pod.spec.image_pull_secrets == [k8s.V1LocalObjectReference(name=fake_pull_secrets)]
 
+    def test_omitted_name(self):
+        k = KubernetesPodOperator(task_id="this-task-name")
+        pod = k.build_pod_request_obj(create_context(k))
+        assert re.match('this-task-name-[a-z0-9]+', pod.metadata.name) is not None
+
+    @pytest.mark.parametrize('use_template', [True, False])
+    @pytest.mark.parametrize('use_pod_spec', [True, False])
+    @patch('pathlib.Path')
     @patch(f"{KPO_MODULE}.KubernetesPodOperator.find_pod")
-    def test_omitted_name(self, mock_find):
+    @patch.dict(
+        "os.environ", AIRFLOW_CONN_MY_CONN='{"extra": {"extra__kubernetes__namespace": "extra-namespace"}}'
+    )
+    def test_omitted_namespace_with_conn(
+        self, mock_find, mock_path, pod_template_file, use_template, pod_spec, use_pod_spec
+    ):
+        """
+        Namespace precedence is as follows:
+            - KPO
+            - airflow connection
+            - infer from k8s when in a cluster
+            - 'default' namespace as a fallback
+
+        Here we check when KPO omitted but we do have a conn where namespace defined.
+        In this case, the namespace should be as defined in connection.
+        """
+        k = KubernetesPodOperator(
+            task_id="task",
+            kubernetes_conn_id='my_conn',
+            **(dict(pod_template_file=pod_template_file) if use_template else {}),
+            **(dict(full_pod_spec=pod_spec) if use_pod_spec else {}),
+        )
+        context = create_context(k)
+        pod = k.build_pod_request_obj(context)
+        mock_path.assert_not_called()
+        if use_pod_spec:
+            expected_namespace = 'podspecnamespace'
+        elif use_template:
+            expected_namespace = 'templatenamespace'
+        else:
+            expected_namespace = 'extra-namespace'
+        assert pod.metadata.namespace == expected_namespace
+        mock_find.return_value = pod
+        k.get_or_create_pod(
+            pod_request_obj=pod,
+            context=context,
+        )
+        mock_find.assert_called_once_with(expected_namespace, context=context)
+
+    @patch('pathlib.Path')
+    @patch(f"{KPO_MODULE}.KubernetesPodOperator.find_pod")
+    @patch.dict("os.environ", AIRFLOW_CONN_MY_CONN='{"extra": {}}')
+    def test_omitted_namespace_with_conn_no_value(self, mock_find, mock_path):
+        """
+        Namespace precedence is as follows:
+            - KPO
+            - airflow connection
+            - infer from k8s when in a cluster
+            - 'default' namespace as a fallback
+
+        Here we check when KPO omitted but we do have a conn where namespace defined.
+        In this case, we should continue down the change.
+        Here we mock not in k8s and therefore get 'default'.
+        """
         k = KubernetesPodOperator(
             image="ubuntu:16.04",
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
-            task_id="this-task-name",
+            task_id="task",
+            name='hello',
+            kubernetes_conn_id='my_conn',
         )
 
+        mock_path.return_value.exists.return_value = False
         context = create_context(k)
         pod = k.build_pod_request_obj(context)
-        assert re.match('this-task-name-[a-z0-9]+', pod.metadata.name) is not None
+        mock_path.assert_called()
+        assert pod.metadata.namespace == 'default'
+        mock_find.return_value = pod
+        k.get_or_create_pod(
+            pod_request_obj=pod,
+            context=context,
+        )
+        mock_find.assert_called_once_with('default', context=context)
 
-    def test_image_pull_policy_correctly_set(self):
+    @patch('pathlib.Path')
+    @patch(f"{KPO_MODULE}.KubernetesPodOperator.find_pod")
+    def test_omitted_namespace_no_conn(self, mock_find, mock_path):
+        """
+        Namespace precedence is as follows:
+            - KPO
+            - airflow connection
+            - infer from k8s when in a cluster
+            - 'default' namespace as a fallback
+
+        Here we check when KPO omitted and no airflow connection, but we are in k8s.
+        In this case, we should use the value from k8s.
+        """
+        mock_path.return_value.exists.return_value = True
+        mock_path.return_value.read_text.return_value = 'abc'
         k = KubernetesPodOperator(
-            namespace="default",
             image="ubuntu:16.04",
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name="test",
             task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            image_pull_policy="Always",
-            cluster_context="default",
+            name='hello',
         )
-        pod = k.build_pod_request_obj(create_context(k))
-        assert pod.spec.containers[0].image_pull_policy == "Always"
 
-    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.delete_pod")
-    @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.find_pod")
-    def test_pod_delete_even_on_launcher_error(self, find_pod_mock, delete_pod_mock):
+        context = create_context(k)
+        pod = k.build_pod_request_obj(context)
+        mock_path.assert_called_once_with('/var/run/secrets/kubernetes.io/serviceaccount/namespace')
+        assert pod.metadata.namespace == 'abc'
+        mock_find.return_value = pod
+        k.get_or_create_pod(
+            pod_request_obj=pod,
+            context=context,
+        )
+        mock_find.assert_called_once_with('abc', context=context)
+
+    @patch('pathlib.Path')
+    @patch(f"{KPO_MODULE}.KubernetesPodOperator.find_pod")
+    def test_omitted_namespace_no_conn_not_in_k8s(self, mock_find, mock_path):
+        """
+        Namespace precedence is as follows:
+            - KPO
+            - airflow connection
+            - infer from k8s when in a cluster
+            - 'default' namespace as a fallback
+
+        Here we check when KPO omitted and no airflow connection and not in a k8s pod.
+        In this case we should end up with the 'default' namespace.
+        """
+        mock_path.return_value.exists.return_value = False
         k = KubernetesPodOperator(
-            namespace="default",
             image="ubuntu:16.04",
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name="test",
             task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            cluster_context="default",
-            is_delete_operator_pod=True,
+            name='hello',
         )
+
+        context = create_context(k)
+        pod = k.build_pod_request_obj(context)
+        mock_path.assert_called_once_with('/var/run/secrets/kubernetes.io/serviceaccount/namespace')
+        assert pod.metadata.namespace == 'default'
+        mock_find.return_value = pod
+        k.get_or_create_pod(
+            pod_request_obj=pod,
+            context=context,
+        )
+        mock_find.assert_called_once_with('default', context=context)
+
+    def test_image_pull_policy_correctly_set(self):
+        k = KubernetesPodOperator(
+            task_id="task",
+            image_pull_policy="Always",
+        )
+        pod = k.build_pod_request_obj(create_context(k))
+        assert pod.spec.containers[0].image_pull_policy == "Always"
+
+    @patch(f"{POD_MANAGER_CLASS}.delete_pod")
+    @patch(f"{KPO_MODULE}.KubernetesPodOperator.find_pod")
+    def test_pod_delete_after_await_container_error(self, find_pod_mock, delete_pod_mock):
+        """
+        When KPO fails unexpectedly during await_container, we should still try to delete the pod,
+        and the pod we try to delete should be the one returned from find_pod earlier.
+        """
+        cont_status = MagicMock()
+        cont_status.name = 'base'
+        cont_status.state.terminated.message = 'my-failure'
+        find_pod_mock.return_value.status.container_statuses = [cont_status]
+        k = KubernetesPodOperator(task_id="task")
         self.await_pod_mock.side_effect = AirflowException("fake failure")
-        with pytest.raises(AirflowException):
+        with pytest.raises(AirflowException, match='my-failure'):
             context = create_context(k)
             k.execute(context=context)
-        assert delete_pod_mock.called
+        delete_pod_mock.assert_called_with(find_pod_mock.return_value)
 
-    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.delete_pod")
-    @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.find_pod")
-    def test_pod_not_deleting_non_existing_pod(self, find_pod_mock, delete_pod_mock):
-
-        find_pod_mock.return_value = None
+    @pytest.mark.parametrize('should_fail', [True, False])
+    @patch(f"{POD_MANAGER_CLASS}.delete_pod")
+    @patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
+    def test_pod_delete_not_called_when_creation_fails(self, await_pod_mock, delete_pod_mock, should_fail):
+        """
+        When pod creation fails, we never get a read of the remote pod.  In this case we don't attempt
+        to delete the pod.
+        """
         k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name="test",
             task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            cluster_context="default",
             is_delete_operator_pod=True,
         )
-        self.create_mock.side_effect = AirflowException("fake failure")
-        with pytest.raises(AirflowException):
-            context = create_context(k)
-            k.execute(context=context)
-        delete_pod_mock.assert_not_called()
+
+        if should_fail:
+            self.create_mock.side_effect = AirflowException("fake failure")
+        else:
+            await_pod_mock.return_value.status.phase = 'Succeeded'
+
+        cm = pytest.raises(AirflowException) if should_fail else nullcontext()
+        with cm:
+            self.run_pod(k)
+
+        if should_fail:
+            delete_pod_mock.assert_not_called()
+        else:
+            delete_pod_mock.assert_called()
 
     @pytest.mark.parametrize('randomize', [True, False])
     def test_provided_pod_name(self, randomize):
         name_base = "test"
-
         k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
             name=name_base,
             random_name_suffix=randomize,
             task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            cluster_context="default",
         )
         context = create_context(k)
         pod = k.build_pod_request_obj(context)
@@ -395,7 +486,7 @@ class TestKubernetesPodOperator:
     @pytest.fixture
     def pod_spec(self):
         return k8s.V1Pod(
-            metadata=k8s.V1ObjectMeta(name="hello", labels={"foo": "bar"}, namespace="mynamespace"),
+            metadata=k8s.V1ObjectMeta(name="hello", labels={"foo": "bar"}, namespace="podspecnamespace"),
             spec=k8s.V1PodSpec(
                 containers=[
                     k8s.V1Container(
@@ -414,12 +505,10 @@ class TestKubernetesPodOperator:
         k = KubernetesPodOperator(
             task_id="task",
             random_name_suffix=randomize_name,
-            in_cluster=False,
-            do_xcom_push=False,
-            cluster_context="default",
             full_pod_spec=pod_spec,
         )
-        pod = self.run_pod(k)
+        context = create_context(k)
+        pod = k.build_pod_request_obj(context)
 
         if randomize_name:
             assert pod.metadata.name.startswith(pod_spec_name_base)
@@ -438,7 +527,7 @@ class TestKubernetesPodOperator:
             "task_id": "task",
             "try_number": "1",
             "airflow_version": mock.ANY,
-            "airflow_kpo_in_cluster": "True",
+            "airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
             "run_id": "test",
         }
 
@@ -450,15 +539,12 @@ class TestKubernetesPodOperator:
         k = KubernetesPodOperator(
             task_id="task",
             random_name_suffix=randomize_name,
-            in_cluster=False,
-            do_xcom_push=False,
-            cluster_context="default",
             full_pod_spec=pod_spec,
             name=name_base,
             image=image,
             labels={"hello": "world"},
         )
-        pod = self.run_pod(k)
+        pod = k.build_pod_request_obj(create_context(k))
 
         # make sure the kwargs takes precedence (and that name is randomized when expected)
         if randomize_name:
@@ -477,7 +563,7 @@ class TestKubernetesPodOperator:
             "task_id": "task",
             "try_number": "1",
             "airflow_version": mock.ANY,
-            "airflow_kpo_in_cluster": "True",
+            "airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
             "run_id": "test",
         }
 
@@ -488,7 +574,7 @@ class TestKubernetesPodOperator:
             kind: Pod
             metadata:
               name: hello
-              namespace: mynamespace
+              namespace: templatenamespace
               labels:
                 foo: bar
             spec:
@@ -532,13 +618,13 @@ class TestKubernetesPodOperator:
             random_name_suffix=randomize_name,
             pod_template_file=pod_template_file,
         )
-        pod = self.run_pod(k)
+        pod = k.build_pod_request_obj(create_context(k))
 
         if randomize_name:
             assert pod.metadata.name.startswith("hello")
             assert pod.metadata.name != "hello"
         else:
-            pod.metadata.name == "hello"
+            assert pod.metadata.name == "hello"
         # Check labels are added from pod_template_file and
         # the pod identifying labels including Airflow version
         assert pod.metadata.labels == {
@@ -548,10 +634,10 @@ class TestKubernetesPodOperator:
             "task_id": "task",
             "try_number": "1",
             "airflow_version": mock.ANY,
-            "airflow_kpo_in_cluster": "True",
+            "airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
             "run_id": "test",
         }
-        assert pod.metadata.namespace == "mynamespace"
+        assert pod.metadata.namespace == "templatenamespace"
         assert pod.spec.containers[0].image == "ubuntu:16.04"
         assert pod.spec.containers[0].image_pull_policy == "Always"
         assert pod.spec.containers[0].command == ["something"]
@@ -599,7 +685,7 @@ class TestKubernetesPodOperator:
             image=image,
             labels={"hello": "world"},
         )
-        pod = self.run_pod(k)
+        pod = k.build_pod_request_obj(create_context(k))
 
         # make sure the kwargs takes precedence (and that name is randomized when expected)
         if randomize_name:
@@ -618,55 +704,16 @@ class TestKubernetesPodOperator:
             "task_id": "task",
             "try_number": "1",
             "airflow_version": mock.ANY,
-            "airflow_kpo_in_cluster": "True",
+            "airflow_kpo_in_cluster": str(k.hook.is_in_cluster),
             "run_id": "test",
         }
 
-    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs")
-    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_container_completion")
-    def test_describes_pod_on_failure(self, await_container_mock, fetch_container_mock):
+    @patch(f"{POD_MANAGER_CLASS}.fetch_container_logs")
+    @patch(f"{POD_MANAGER_CLASS}.await_container_completion", new=MagicMock)
+    def test_no_handle_failure_on_success(self, fetch_container_mock):
         name_base = "test"
 
-        k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name=name_base,
-            task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            cluster_context="default",
-        )
-        fetch_container_mock.return_value = None
-        remote_pod_mock = MagicMock()
-        remote_pod_mock.status.phase = 'Failed'
-        self.await_pod_mock.return_value = remote_pod_mock
-
-        with pytest.raises(AirflowException, match=f"Pod {name_base}.[a-z0-9]+ returned a failure:.*"):
-            context = create_context(k)
-            k.execute(context=context)
-
-        assert k.client.read_namespaced_pod.called is False
-
-    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs")
-    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_container_completion")
-    def test_no_handle_failure_on_success(self, await_container_mock, fetch_container_mock):
-        name_base = "test"
-
-        k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name=name_base,
-            task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            cluster_context="default",
-        )
+        k = KubernetesPodOperator(name=name_base, task_id="task")
 
         fetch_container_mock.return_value = None
         remote_pod_mock = MagicMock()
@@ -677,8 +724,6 @@ class TestKubernetesPodOperator:
         self.run_pod(k)
 
     def test_create_with_affinity(self):
-        name_base = "test"
-
         affinity = {
             "nodeAffinity": {
                 "preferredDuringSchedulingIgnoredDuringExecution": [
@@ -693,16 +738,7 @@ class TestKubernetesPodOperator:
         }
 
         k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name=name_base,
             task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            cluster_context="default",
             affinity=affinity,
         )
 
@@ -727,16 +763,7 @@ class TestKubernetesPodOperator:
         )
 
         k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name=name_base,
             task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            cluster_context="default",
             affinity=k8s_api_affinity,
         )
 
@@ -751,16 +778,7 @@ class TestKubernetesPodOperator:
         tolerations = [{"key": "key", "operator": "Equal", "value": "value"}]
 
         k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name="name",
             task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            cluster_context="default",
             tolerations=tolerations,
         )
 
@@ -770,16 +788,7 @@ class TestKubernetesPodOperator:
         assert sanitized_pod["spec"]["tolerations"] == tolerations
 
         k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name="name",
             task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            cluster_context="default",
             tolerations=k8s_api_tolerations,
         )
 
@@ -792,16 +801,7 @@ class TestKubernetesPodOperator:
         node_selector = {"beta.kubernetes.io/os": "linux"}
 
         k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name="name",
             task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            cluster_context="default",
             node_selector=node_selector,
         )
 
@@ -815,16 +815,7 @@ class TestKubernetesPodOperator:
             DeprecationWarning, match="node_selectors is deprecated. Please use node_selector instead."
         ):
             k = KubernetesPodOperator(
-                namespace="default",
-                image="ubuntu:16.04",
-                cmds=["bash", "-cx"],
-                arguments=["echo 10"],
-                labels={"foo": "bar"},
-                name="name",
                 task_id="task",
-                in_cluster=False,
-                do_xcom_push=False,
-                cluster_context="default",
                 node_selectors=node_selector,
             )
 
@@ -834,8 +825,8 @@ class TestKubernetesPodOperator:
         assert sanitized_pod["spec"]["nodeSelector"] == node_selector
 
     @pytest.mark.parametrize('do_xcom_push', [True, False])
-    @mock.patch(f"{POD_MANAGER_CLASS}.extract_xcom")
-    @mock.patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start")
+    @patch(f"{POD_MANAGER_CLASS}.extract_xcom")
+    @patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start")
     def test_push_xcom_pod_info(
         self, mock_await_xcom_sidecar_container_start, mock_extract_xcom, do_xcom_push
     ):
@@ -843,12 +834,7 @@ class TestKubernetesPodOperator:
         mock_extract_xcom.return_value = '{}'
         mock_await_xcom_sidecar_container_start.return_value = None
         k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            name="test",
             task_id="task",
-            in_cluster=False,
             do_xcom_push=do_xcom_push,
         )
 
@@ -858,6 +844,7 @@ class TestKubernetesPodOperator:
         assert pod_name == pod.metadata.name
         assert pod_namespace == pod.metadata.namespace
 
+    @patch(HOOK_CLASS, new=MagicMock)
     def test_previous_pods_ignored_for_reattached(self):
         """
         When looking for pods to possibly reattach to,
@@ -865,22 +852,17 @@ class TestKubernetesPodOperator:
         """
         k = KubernetesPodOperator(
             namespace="default",
-            image="ubuntu:16.04",
-            name="test",
             task_id="task",
         )
         self.run_pod(k)
         _, kwargs = k.client.list_namespaced_pod.call_args
         assert 'already_checked!=True' in kwargs['label_selector']
 
-    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.delete_pod")
-    @mock.patch(f"{KPO_MODULE}.KubernetesPodOperator.patch_already_checked")
+    @patch(f"{POD_MANAGER_CLASS}.delete_pod")
+    @patch(f"{KPO_MODULE}.KubernetesPodOperator.patch_already_checked")
     def test_mark_checked_unexpected_exception(self, mock_patch_already_checked, mock_delete_pod):
         """If we aren't deleting pods and have an exception, mark it so we don't reattach to it"""
         k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            name="test",
             task_id="task",
             is_delete_operator_pod=False,
         )
@@ -892,19 +874,14 @@ class TestKubernetesPodOperator:
         mock_delete_pod.assert_not_called()
 
     @pytest.mark.parametrize('do_xcom_push', [True, False])
-    @mock.patch(f"{POD_MANAGER_CLASS}.extract_xcom")
-    @mock.patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start")
+    @patch(f"{POD_MANAGER_CLASS}.extract_xcom")
+    @patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start")
     def test_wait_for_xcom_sidecar_iff_push_xcom(self, mock_await, mock_extract_xcom, do_xcom_push):
         """Assert we wait for xcom sidecar container if and only if we push xcom."""
         mock_extract_xcom.return_value = '{}'
         mock_await.return_value = None
         k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            name="test",
             task_id="task",
-            in_cluster=False,
             do_xcom_push=do_xcom_push,
         )
         self.run_pod(k)
@@ -914,15 +891,12 @@ class TestKubernetesPodOperator:
             mock_await.assert_not_called()
 
     @pytest.mark.parametrize('should_fail', [True, False])
-    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.delete_pod")
-    @mock.patch(f"{KPO_MODULE}.KubernetesPodOperator.patch_already_checked")
+    @patch(f"{POD_MANAGER_CLASS}.delete_pod")
+    @patch(f"{KPO_MODULE}.KubernetesPodOperator.patch_already_checked")
     def test_mark_checked_if_not_deleted(self, mock_patch_already_checked, mock_delete_pod, should_fail):
         """If we aren't deleting pods mark "checked" if the task completes (successful or otherwise)"""
         dag = DAG('hello2', start_date=pendulum.now())
         k = KubernetesPodOperator(
-            namespace="default",
-            image="ubuntu:16.04",
-            name="test",
             task_id="task",
             is_delete_operator_pod=False,
             dag=dag,
@@ -941,7 +915,7 @@ class TestKubernetesPodOperator:
 
 
 def test__suppress():
-    with mock.patch('logging.Logger.error') as mock_error:
+    with patch('logging.Logger.error') as mock_error:
 
         with _suppress(ValueError):
             raise ValueError("failure")