You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:46:55 UTC

[airflow] 11/37: More robust cleanup of executors in test_kubernetes_executor (#28281)

This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 389c074b82758ff56c0785ad19ee06ed1b9e860a
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Sat Dec 10 17:09:06 2022 +0100

    More robust cleanup of executors in test_kubernetes_executor (#28281)
    
    As a follow up after #28047, this PR will make the test cleanup
    more robust and resilient to any errors that might have caused
    kubernetes_executors left behind.
    
    wrapping start()/end() in try/finally will make the tests
    completely resilient to cases where the asserts start to fail -
    without those, any failure in tests would cause the same resource
    leakage as we initially had when #28407 was iterated on.
    
    (cherry picked from commit 3b203bcb676853bd642a01121988b1cbe929307d)
---
 tests/executors/test_kubernetes_executor.py | 348 +++++++++++++++-------------
 1 file changed, 193 insertions(+), 155 deletions(-)

diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 97619225e6..d1210765c0 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -162,9 +162,11 @@ class TestAirflowKubernetesScheduler:
         kube_executor = KubernetesExecutor()
         kube_executor.job_id = 1
         kube_executor.start()
-        kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
-
-        mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
+        try:
+            kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
+            mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
+        finally:
+            kube_executor.end()
 
     @unittest.skipIf(AirflowKubernetesScheduler is None, "kubernetes python package is not installed")
     @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@@ -203,9 +205,11 @@ class TestAirflowKubernetesScheduler:
         kube_executor = KubernetesExecutor()
         kube_executor.job_id = 1
         kube_executor.start()
-
-        kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
-        mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
+        try:
+            kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
+            mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
+        finally:
+            kube_executor.end()
 
 
 class TestKubernetesExecutor:
@@ -266,32 +270,35 @@ class TestKubernetesExecutor:
         with conf_vars(config):
             kubernetes_executor = self.kubernetes_executor
             kubernetes_executor.start()
-            # Execute a task while the Api Throws errors
-            try_number = 1
-            task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number)
-            kubernetes_executor.execute_async(
-                key=task_instance_key,
-                queue=None,
-                command=["airflow", "tasks", "run", "true", "some_parameter"],
-            )
-            kubernetes_executor.sync()
+            try:
+                # Execute a task while the Api Throws errors
+                try_number = 1
+                task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number)
+                kubernetes_executor.execute_async(
+                    key=task_instance_key,
+                    queue=None,
+                    command=["airflow", "tasks", "run", "true", "some_parameter"],
+                )
+                kubernetes_executor.sync()
 
-            assert mock_kube_client.create_namespaced_pod.call_count == 1
+                assert mock_kube_client.create_namespaced_pod.call_count == 1
 
-            if should_requeue:
-                assert not kubernetes_executor.task_queue.empty()
+                if should_requeue:
+                    assert not kubernetes_executor.task_queue.empty()
 
-                # Disable the ApiException
-                mock_kube_client.create_namespaced_pod.side_effect = None
+                    # Disable the ApiException
+                    mock_kube_client.create_namespaced_pod.side_effect = None
 
-                # Execute the task without errors should empty the queue
-                mock_kube_client.create_namespaced_pod.reset_mock()
-                kubernetes_executor.sync()
-                assert mock_kube_client.create_namespaced_pod.called
-                assert kubernetes_executor.task_queue.empty()
-            else:
-                assert kubernetes_executor.task_queue.empty()
-                assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
+                    # Execute the task without errors should empty the queue
+                    mock_kube_client.create_namespaced_pod.reset_mock()
+                    kubernetes_executor.sync()
+                    assert mock_kube_client.create_namespaced_pod.called
+                    assert kubernetes_executor.task_queue.empty()
+                else:
+                    assert kubernetes_executor.task_queue.empty()
+                    assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
+            finally:
+                kubernetes_executor.end()
 
     @pytest.mark.skipif(
         AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
@@ -311,23 +318,26 @@ class TestKubernetesExecutor:
 
         kubernetes_executor = self.kubernetes_executor
         kubernetes_executor.start()
-        try_number = 1
-        task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number)
-        kubernetes_executor.execute_async(
-            key=task_instance_key,
-            queue=None,
-            command=["airflow", "tasks", "run", "true", "some_parameter"],
-        )
-        kubernetes_executor.sync()
+        try:
+            try_number = 1
+            task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number)
+            kubernetes_executor.execute_async(
+                key=task_instance_key,
+                queue=None,
+                command=["airflow", "tasks", "run", "true", "some_parameter"],
+            )
+            kubernetes_executor.sync()
 
-        # The pod_mutation_hook should have been called once.
-        assert mock_pmh.call_count == 1
-        # There should be no pod creation request sent
-        assert mock_kube_client.create_namespaced_pod.call_count == 0
-        # The task is not re-queued and there is the failed record in event_buffer
-        assert kubernetes_executor.task_queue.empty()
-        assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
-        assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == exception_in_pmh
+            # The pod_mutation_hook should have been called once.
+            assert mock_pmh.call_count == 1
+            # There should be no pod creation request sent
+            assert mock_kube_client.create_namespaced_pod.call_count == 0
+            # The task is not re-queued and there is the failed record in event_buffer
+            assert kubernetes_executor.task_queue.empty()
+            assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
+            assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == exception_in_pmh
+        finally:
+            kubernetes_executor.end()
 
     @pytest.mark.skipif(
         AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
@@ -351,19 +361,22 @@ class TestKubernetesExecutor:
         with conf_vars(config):
             kubernetes_executor = self.kubernetes_executor
             kubernetes_executor.start()
-            # Execute a task while the Api Throws errors
-            try_number = 1
-            task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number)
-            kubernetes_executor.execute_async(
-                key=task_instance_key,
-                queue=None,
-                command=["airflow", "tasks", "run", "true", "some_parameter"],
-            )
-            kubernetes_executor.sync()
+            try:
+                # Execute a task while the Api Throws errors
+                try_number = 1
+                task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number)
+                kubernetes_executor.execute_async(
+                    key=task_instance_key,
+                    queue=None,
+                    command=["airflow", "tasks", "run", "true", "some_parameter"],
+                )
+                kubernetes_executor.sync()
 
-            assert kubernetes_executor.task_queue.empty()
-            assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
-            assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == fail_msg
+                assert kubernetes_executor.task_queue.empty()
+                assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
+                assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == fail_msg
+            finally:
+                kubernetes_executor.end()
 
     @mock.patch("airflow.executors.kubernetes_executor.KubeConfig")
     @mock.patch("airflow.executors.kubernetes_executor.KubernetesExecutor.sync")
@@ -384,20 +397,22 @@ class TestKubernetesExecutor:
     def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job_watcher):
         executor = self.kubernetes_executor
         executor.start()
+        try:
+            assert executor.event_buffer == {}
+            executor.execute_async(
+                key=("dag", "task", datetime.utcnow(), 1),
+                queue=None,
+                command=["airflow", "tasks", "run", "true", "some_parameter"],
+                executor_config=k8s.V1Pod(
+                    spec=k8s.V1PodSpec(
+                        containers=[k8s.V1Container(name="base", image="myimage", image_pull_policy="Always")]
+                    )
+                ),
+            )
 
-        assert executor.event_buffer == {}
-        executor.execute_async(
-            key=("dag", "task", datetime.utcnow(), 1),
-            queue=None,
-            command=["airflow", "tasks", "run", "true", "some_parameter"],
-            executor_config=k8s.V1Pod(
-                spec=k8s.V1PodSpec(
-                    containers=[k8s.V1Container(name="base", image="myimage", image_pull_policy="Always")]
-                )
-            ),
-        )
-
-        assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"
+            assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"
+        finally:
+            executor.end()
 
     @pytest.mark.execution_timeout(10)
     @pytest.mark.skipif(
@@ -417,83 +432,88 @@ class TestKubernetesExecutor:
         with conf_vars({("kubernetes", "pod_template_file"): ""}):
             executor = self.kubernetes_executor
             executor.start()
+            try:
+                assert executor.event_buffer == {}
+                assert executor.task_queue.empty()
+
+                executor.execute_async(
+                    key=TaskInstanceKey("dag", "task", "run_id", 1),
+                    queue=None,
+                    command=["airflow", "tasks", "run", "true", "some_parameter"],
+                    executor_config={
+                        "pod_template_file": template_file,
+                        "pod_override": k8s.V1Pod(
+                            metadata=k8s.V1ObjectMeta(labels={"release": "stable"}),
+                            spec=k8s.V1PodSpec(
+                                containers=[k8s.V1Container(name="base", image="airflow:3.6")],
+                            ),
+                        ),
+                    },
+                )
 
-            assert executor.event_buffer == {}
-            assert executor.task_queue.empty()
-
-            executor.execute_async(
-                key=TaskInstanceKey("dag", "task", "run_id", 1),
-                queue=None,
-                command=["airflow", "tasks", "run", "true", "some_parameter"],
-                executor_config={
-                    "pod_template_file": template_file,
-                    "pod_override": k8s.V1Pod(
-                        metadata=k8s.V1ObjectMeta(labels={"release": "stable"}),
+                assert not executor.task_queue.empty()
+                task = executor.task_queue.get_nowait()
+                _, _, expected_executor_config, expected_pod_template_file = task
+                executor.task_queue.task_done()
+                # Test that the correct values have been put to queue
+                assert expected_executor_config.metadata.labels == {"release": "stable"}
+                assert expected_pod_template_file == template_file
+
+                self.kubernetes_executor.kube_scheduler.run_next(task)
+                mock_run_pod_async.assert_called_once_with(
+                    k8s.V1Pod(
+                        api_version="v1",
+                        kind="Pod",
+                        metadata=k8s.V1ObjectMeta(
+                            name=mock.ANY,
+                            namespace="default",
+                            annotations={
+                                "dag_id": "dag",
+                                "run_id": "run_id",
+                                "task_id": "task",
+                                "try_number": "1",
+                            },
+                            labels={
+                                "airflow-worker": "5",
+                                "airflow_version": mock.ANY,
+                                "dag_id": "dag",
+                                "run_id": "run_id",
+                                "kubernetes_executor": "True",
+                                "mylabel": "foo",
+                                "release": "stable",
+                                "task_id": "task",
+                                "try_number": "1",
+                            },
+                        ),
                         spec=k8s.V1PodSpec(
-                            containers=[k8s.V1Container(name="base", image="airflow:3.6")],
+                            containers=[
+                                k8s.V1Container(
+                                    name="base",
+                                    image="airflow:3.6",
+                                    args=["airflow", "tasks", "run", "true", "some_parameter"],
+                                    env=[k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value="True")],
+                                )
+                            ],
+                            image_pull_secrets=[k8s.V1LocalObjectReference(name="airflow-registry")],
+                            scheduler_name="default-scheduler",
+                            security_context=k8s.V1PodSecurityContext(fs_group=50000, run_as_user=50000),
                         ),
-                    ),
-                },
-            )
-
-            assert not executor.task_queue.empty()
-            task = executor.task_queue.get_nowait()
-            _, _, expected_executor_config, expected_pod_template_file = task
-
-            # Test that the correct values have been put to queue
-            assert expected_executor_config.metadata.labels == {"release": "stable"}
-            assert expected_pod_template_file == template_file
-
-            self.kubernetes_executor.kube_scheduler.run_next(task)
-            mock_run_pod_async.assert_called_once_with(
-                k8s.V1Pod(
-                    api_version="v1",
-                    kind="Pod",
-                    metadata=k8s.V1ObjectMeta(
-                        name=mock.ANY,
-                        namespace="default",
-                        annotations={
-                            "dag_id": "dag",
-                            "run_id": "run_id",
-                            "task_id": "task",
-                            "try_number": "1",
-                        },
-                        labels={
-                            "airflow-worker": "5",
-                            "airflow_version": mock.ANY,
-                            "dag_id": "dag",
-                            "run_id": "run_id",
-                            "kubernetes_executor": "True",
-                            "mylabel": "foo",
-                            "release": "stable",
-                            "task_id": "task",
-                            "try_number": "1",
-                        },
-                    ),
-                    spec=k8s.V1PodSpec(
-                        containers=[
-                            k8s.V1Container(
-                                name="base",
-                                image="airflow:3.6",
-                                args=["airflow", "tasks", "run", "true", "some_parameter"],
-                                env=[k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value="True")],
-                            )
-                        ],
-                        image_pull_secrets=[k8s.V1LocalObjectReference(name="airflow-registry")],
-                        scheduler_name="default-scheduler",
-                        security_context=k8s.V1PodSecurityContext(fs_group=50000, run_as_user=50000),
-                    ),
+                    )
                 )
-            )
+            finally:
+                executor.end()
 
     @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
     @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
     def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher):
         executor = self.kubernetes_executor
         executor.start()
-        key = ("dag_id", "task_id", "run_id", "try_number1")
-        executor._change_state(key, State.RUNNING, "pod_id", "default")
-        assert executor.event_buffer[key][0] == State.RUNNING
+        try:
+            key = ("dag_id", "task_id", "run_id", "try_number1")
+            executor._change_state(key, State.RUNNING, "pod_id", "default")
+            assert executor.event_buffer[key][0] == State.RUNNING
+        finally:
+            executor.end()
 
     @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
     @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@@ -501,10 +521,13 @@ class TestKubernetesExecutor:
     def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
         executor = self.kubernetes_executor
         executor.start()
-        key = ("dag_id", "task_id", "run_id", "try_number2")
-        executor._change_state(key, State.SUCCESS, "pod_id", "default")
-        assert executor.event_buffer[key][0] == State.SUCCESS
-        mock_delete_pod.assert_called_once_with("pod_id", "default")
+        try:
+            key = ("dag_id", "task_id", "run_id", "try_number2")
+            executor._change_state(key, State.SUCCESS, "pod_id", "default")
+            assert executor.event_buffer[key][0] == State.SUCCESS
+            mock_delete_pod.assert_called_once_with("pod_id", "default")
+        finally:
+            executor.end()
 
     @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
     @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@@ -516,10 +539,13 @@ class TestKubernetesExecutor:
         executor.kube_config.delete_worker_pods = False
         executor.kube_config.delete_worker_pods_on_failure = False
         executor.start()
-        key = ("dag_id", "task_id", "run_id", "try_number3")
-        executor._change_state(key, State.FAILED, "pod_id", "default")
-        assert executor.event_buffer[key][0] == State.FAILED
-        mock_delete_pod.assert_not_called()
+        try:
+            key = ("dag_id", "task_id", "run_id", "try_number3")
+            executor._change_state(key, State.FAILED, "pod_id", "default")
+            assert executor.event_buffer[key][0] == State.FAILED
+            mock_delete_pod.assert_not_called()
+        finally:
+            executor.end()
 
     @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
     @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@@ -532,10 +558,13 @@ class TestKubernetesExecutor:
         executor.kube_config.delete_worker_pods_on_failure = False
 
         executor.start()
-        key = ("dag_id", "task_id", "run_id", "try_number2")
-        executor._change_state(key, State.SUCCESS, "pod_id", "default")
-        assert executor.event_buffer[key][0] == State.SUCCESS
-        mock_delete_pod.assert_not_called()
+        try:
+            key = ("dag_id", "task_id", "run_id", "try_number2")
+            executor._change_state(key, State.SUCCESS, "pod_id", "default")
+            assert executor.event_buffer[key][0] == State.SUCCESS
+            mock_delete_pod.assert_not_called()
+        finally:
+            executor.end()
 
     @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
     @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@@ -547,10 +576,13 @@ class TestKubernetesExecutor:
         executor.kube_config.delete_worker_pods_on_failure = True
 
         executor.start()
-        key = ("dag_id", "task_id", "run_id", "try_number2")
-        executor._change_state(key, State.FAILED, "pod_id", "test-namespace")
-        assert executor.event_buffer[key][0] == State.FAILED
-        mock_delete_pod.assert_called_once_with("pod_id", "test-namespace")
+        try:
+            key = ("dag_id", "task_id", "run_id", "try_number2")
+            executor._change_state(key, State.FAILED, "pod_id", "test-namespace")
+            assert executor.event_buffer[key][0] == State.FAILED
+            mock_delete_pod.assert_called_once_with("pod_id", "test-namespace")
+        finally:
+            executor.end()
 
     @mock.patch("airflow.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task")
     @mock.patch("airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods")
@@ -789,8 +821,11 @@ class TestKubernetesExecutor:
             executor = KubernetesExecutor()
             executor.job_id = 123
             executor.start()
-            assert 2 == len(executor.event_scheduler.queue)
-            executor._check_worker_pods_pending_timeout()
+            try:
+                assert 2 == len(executor.event_scheduler.queue)
+                executor._check_worker_pods_pending_timeout()
+            finally:
+                executor.end()
 
         mock_kube_client.list_namespaced_pod.assert_called_once_with(
             "mynamespace",
@@ -831,7 +866,10 @@ class TestKubernetesExecutor:
             executor = KubernetesExecutor()
             executor.job_id = 123
             executor.start()
-            executor._check_worker_pods_pending_timeout()
+            try:
+                executor._check_worker_pods_pending_timeout()
+            finally:
+                executor.end()
 
         mock_kube_client.list_pod_for_all_namespaces.assert_called_once_with(
             field_selector="status.phase=Pending",