You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:46:55 UTC
[airflow] 11/37: More robust cleanup of executors in test_kubernetes_executor (#28281)
This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 389c074b82758ff56c0785ad19ee06ed1b9e860a
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Sat Dec 10 17:09:06 2022 +0100
More robust cleanup of executors in test_kubernetes_executor (#28281)
As a follow up after #28047, this PR will make the test cleanup
more robust and resilient to any errors that might have caused
kubernetes_executors left behind.
wrapping start()/end() in try/finally will make the tests
completely resilient to cases where the asserts start to fail -
without those, any failure in tests would cause the same resource
leakage as we initially had when #28407 was iterated on.
(cherry picked from commit 3b203bcb676853bd642a01121988b1cbe929307d)
---
tests/executors/test_kubernetes_executor.py | 348 +++++++++++++++-------------
1 file changed, 193 insertions(+), 155 deletions(-)
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 97619225e6..d1210765c0 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -162,9 +162,11 @@ class TestAirflowKubernetesScheduler:
kube_executor = KubernetesExecutor()
kube_executor.job_id = 1
kube_executor.start()
- kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
-
- mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
+ try:
+ kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
+ mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
+ finally:
+ kube_executor.end()
@unittest.skipIf(AirflowKubernetesScheduler is None, "kubernetes python package is not installed")
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@@ -203,9 +205,11 @@ class TestAirflowKubernetesScheduler:
kube_executor = KubernetesExecutor()
kube_executor.job_id = 1
kube_executor.start()
-
- kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
- mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
+ try:
+ kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
+ mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
+ finally:
+ kube_executor.end()
class TestKubernetesExecutor:
@@ -266,32 +270,35 @@ class TestKubernetesExecutor:
with conf_vars(config):
kubernetes_executor = self.kubernetes_executor
kubernetes_executor.start()
- # Execute a task while the Api Throws errors
- try_number = 1
- task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number)
- kubernetes_executor.execute_async(
- key=task_instance_key,
- queue=None,
- command=["airflow", "tasks", "run", "true", "some_parameter"],
- )
- kubernetes_executor.sync()
+ try:
+ # Execute a task while the Api Throws errors
+ try_number = 1
+ task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number)
+ kubernetes_executor.execute_async(
+ key=task_instance_key,
+ queue=None,
+ command=["airflow", "tasks", "run", "true", "some_parameter"],
+ )
+ kubernetes_executor.sync()
- assert mock_kube_client.create_namespaced_pod.call_count == 1
+ assert mock_kube_client.create_namespaced_pod.call_count == 1
- if should_requeue:
- assert not kubernetes_executor.task_queue.empty()
+ if should_requeue:
+ assert not kubernetes_executor.task_queue.empty()
- # Disable the ApiException
- mock_kube_client.create_namespaced_pod.side_effect = None
+ # Disable the ApiException
+ mock_kube_client.create_namespaced_pod.side_effect = None
- # Execute the task without errors should empty the queue
- mock_kube_client.create_namespaced_pod.reset_mock()
- kubernetes_executor.sync()
- assert mock_kube_client.create_namespaced_pod.called
- assert kubernetes_executor.task_queue.empty()
- else:
- assert kubernetes_executor.task_queue.empty()
- assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
+ # Execute the task without errors should empty the queue
+ mock_kube_client.create_namespaced_pod.reset_mock()
+ kubernetes_executor.sync()
+ assert mock_kube_client.create_namespaced_pod.called
+ assert kubernetes_executor.task_queue.empty()
+ else:
+ assert kubernetes_executor.task_queue.empty()
+ assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
+ finally:
+ kubernetes_executor.end()
@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
@@ -311,23 +318,26 @@ class TestKubernetesExecutor:
kubernetes_executor = self.kubernetes_executor
kubernetes_executor.start()
- try_number = 1
- task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number)
- kubernetes_executor.execute_async(
- key=task_instance_key,
- queue=None,
- command=["airflow", "tasks", "run", "true", "some_parameter"],
- )
- kubernetes_executor.sync()
+ try:
+ try_number = 1
+ task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number)
+ kubernetes_executor.execute_async(
+ key=task_instance_key,
+ queue=None,
+ command=["airflow", "tasks", "run", "true", "some_parameter"],
+ )
+ kubernetes_executor.sync()
- # The pod_mutation_hook should have been called once.
- assert mock_pmh.call_count == 1
- # There should be no pod creation request sent
- assert mock_kube_client.create_namespaced_pod.call_count == 0
- # The task is not re-queued and there is the failed record in event_buffer
- assert kubernetes_executor.task_queue.empty()
- assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
- assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == exception_in_pmh
+ # The pod_mutation_hook should have been called once.
+ assert mock_pmh.call_count == 1
+ # There should be no pod creation request sent
+ assert mock_kube_client.create_namespaced_pod.call_count == 0
+ # The task is not re-queued and there is the failed record in event_buffer
+ assert kubernetes_executor.task_queue.empty()
+ assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
+ assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == exception_in_pmh
+ finally:
+ kubernetes_executor.end()
@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
@@ -351,19 +361,22 @@ class TestKubernetesExecutor:
with conf_vars(config):
kubernetes_executor = self.kubernetes_executor
kubernetes_executor.start()
- # Execute a task while the Api Throws errors
- try_number = 1
- task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number)
- kubernetes_executor.execute_async(
- key=task_instance_key,
- queue=None,
- command=["airflow", "tasks", "run", "true", "some_parameter"],
- )
- kubernetes_executor.sync()
+ try:
+ # Execute a task while the Api Throws errors
+ try_number = 1
+ task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number)
+ kubernetes_executor.execute_async(
+ key=task_instance_key,
+ queue=None,
+ command=["airflow", "tasks", "run", "true", "some_parameter"],
+ )
+ kubernetes_executor.sync()
- assert kubernetes_executor.task_queue.empty()
- assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
- assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == fail_msg
+ assert kubernetes_executor.task_queue.empty()
+ assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
+ assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == fail_msg
+ finally:
+ kubernetes_executor.end()
@mock.patch("airflow.executors.kubernetes_executor.KubeConfig")
@mock.patch("airflow.executors.kubernetes_executor.KubernetesExecutor.sync")
@@ -384,20 +397,22 @@ class TestKubernetesExecutor:
def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job_watcher):
executor = self.kubernetes_executor
executor.start()
+ try:
+ assert executor.event_buffer == {}
+ executor.execute_async(
+ key=("dag", "task", datetime.utcnow(), 1),
+ queue=None,
+ command=["airflow", "tasks", "run", "true", "some_parameter"],
+ executor_config=k8s.V1Pod(
+ spec=k8s.V1PodSpec(
+ containers=[k8s.V1Container(name="base", image="myimage", image_pull_policy="Always")]
+ )
+ ),
+ )
- assert executor.event_buffer == {}
- executor.execute_async(
- key=("dag", "task", datetime.utcnow(), 1),
- queue=None,
- command=["airflow", "tasks", "run", "true", "some_parameter"],
- executor_config=k8s.V1Pod(
- spec=k8s.V1PodSpec(
- containers=[k8s.V1Container(name="base", image="myimage", image_pull_policy="Always")]
- )
- ),
- )
-
- assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"
+ assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"
+ finally:
+ executor.end()
@pytest.mark.execution_timeout(10)
@pytest.mark.skipif(
@@ -417,83 +432,88 @@ class TestKubernetesExecutor:
with conf_vars({("kubernetes", "pod_template_file"): ""}):
executor = self.kubernetes_executor
executor.start()
+ try:
+ assert executor.event_buffer == {}
+ assert executor.task_queue.empty()
+
+ executor.execute_async(
+ key=TaskInstanceKey("dag", "task", "run_id", 1),
+ queue=None,
+ command=["airflow", "tasks", "run", "true", "some_parameter"],
+ executor_config={
+ "pod_template_file": template_file,
+ "pod_override": k8s.V1Pod(
+ metadata=k8s.V1ObjectMeta(labels={"release": "stable"}),
+ spec=k8s.V1PodSpec(
+ containers=[k8s.V1Container(name="base", image="airflow:3.6")],
+ ),
+ ),
+ },
+ )
- assert executor.event_buffer == {}
- assert executor.task_queue.empty()
-
- executor.execute_async(
- key=TaskInstanceKey("dag", "task", "run_id", 1),
- queue=None,
- command=["airflow", "tasks", "run", "true", "some_parameter"],
- executor_config={
- "pod_template_file": template_file,
- "pod_override": k8s.V1Pod(
- metadata=k8s.V1ObjectMeta(labels={"release": "stable"}),
+ assert not executor.task_queue.empty()
+ task = executor.task_queue.get_nowait()
+ _, _, expected_executor_config, expected_pod_template_file = task
+ executor.task_queue.task_done()
+ # Test that the correct values have been put to queue
+ assert expected_executor_config.metadata.labels == {"release": "stable"}
+ assert expected_pod_template_file == template_file
+
+ self.kubernetes_executor.kube_scheduler.run_next(task)
+ mock_run_pod_async.assert_called_once_with(
+ k8s.V1Pod(
+ api_version="v1",
+ kind="Pod",
+ metadata=k8s.V1ObjectMeta(
+ name=mock.ANY,
+ namespace="default",
+ annotations={
+ "dag_id": "dag",
+ "run_id": "run_id",
+ "task_id": "task",
+ "try_number": "1",
+ },
+ labels={
+ "airflow-worker": "5",
+ "airflow_version": mock.ANY,
+ "dag_id": "dag",
+ "run_id": "run_id",
+ "kubernetes_executor": "True",
+ "mylabel": "foo",
+ "release": "stable",
+ "task_id": "task",
+ "try_number": "1",
+ },
+ ),
spec=k8s.V1PodSpec(
- containers=[k8s.V1Container(name="base", image="airflow:3.6")],
+ containers=[
+ k8s.V1Container(
+ name="base",
+ image="airflow:3.6",
+ args=["airflow", "tasks", "run", "true", "some_parameter"],
+ env=[k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value="True")],
+ )
+ ],
+ image_pull_secrets=[k8s.V1LocalObjectReference(name="airflow-registry")],
+ scheduler_name="default-scheduler",
+ security_context=k8s.V1PodSecurityContext(fs_group=50000, run_as_user=50000),
),
- ),
- },
- )
-
- assert not executor.task_queue.empty()
- task = executor.task_queue.get_nowait()
- _, _, expected_executor_config, expected_pod_template_file = task
-
- # Test that the correct values have been put to queue
- assert expected_executor_config.metadata.labels == {"release": "stable"}
- assert expected_pod_template_file == template_file
-
- self.kubernetes_executor.kube_scheduler.run_next(task)
- mock_run_pod_async.assert_called_once_with(
- k8s.V1Pod(
- api_version="v1",
- kind="Pod",
- metadata=k8s.V1ObjectMeta(
- name=mock.ANY,
- namespace="default",
- annotations={
- "dag_id": "dag",
- "run_id": "run_id",
- "task_id": "task",
- "try_number": "1",
- },
- labels={
- "airflow-worker": "5",
- "airflow_version": mock.ANY,
- "dag_id": "dag",
- "run_id": "run_id",
- "kubernetes_executor": "True",
- "mylabel": "foo",
- "release": "stable",
- "task_id": "task",
- "try_number": "1",
- },
- ),
- spec=k8s.V1PodSpec(
- containers=[
- k8s.V1Container(
- name="base",
- image="airflow:3.6",
- args=["airflow", "tasks", "run", "true", "some_parameter"],
- env=[k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value="True")],
- )
- ],
- image_pull_secrets=[k8s.V1LocalObjectReference(name="airflow-registry")],
- scheduler_name="default-scheduler",
- security_context=k8s.V1PodSecurityContext(fs_group=50000, run_as_user=50000),
- ),
+ )
)
- )
+ finally:
+ executor.end()
@mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher):
executor = self.kubernetes_executor
executor.start()
- key = ("dag_id", "task_id", "run_id", "try_number1")
- executor._change_state(key, State.RUNNING, "pod_id", "default")
- assert executor.event_buffer[key][0] == State.RUNNING
+ try:
+ key = ("dag_id", "task_id", "run_id", "try_number1")
+ executor._change_state(key, State.RUNNING, "pod_id", "default")
+ assert executor.event_buffer[key][0] == State.RUNNING
+ finally:
+ executor.end()
@mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@@ -501,10 +521,13 @@ class TestKubernetesExecutor:
def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
executor = self.kubernetes_executor
executor.start()
- key = ("dag_id", "task_id", "run_id", "try_number2")
- executor._change_state(key, State.SUCCESS, "pod_id", "default")
- assert executor.event_buffer[key][0] == State.SUCCESS
- mock_delete_pod.assert_called_once_with("pod_id", "default")
+ try:
+ key = ("dag_id", "task_id", "run_id", "try_number2")
+ executor._change_state(key, State.SUCCESS, "pod_id", "default")
+ assert executor.event_buffer[key][0] == State.SUCCESS
+ mock_delete_pod.assert_called_once_with("pod_id", "default")
+ finally:
+ executor.end()
@mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@@ -516,10 +539,13 @@ class TestKubernetesExecutor:
executor.kube_config.delete_worker_pods = False
executor.kube_config.delete_worker_pods_on_failure = False
executor.start()
- key = ("dag_id", "task_id", "run_id", "try_number3")
- executor._change_state(key, State.FAILED, "pod_id", "default")
- assert executor.event_buffer[key][0] == State.FAILED
- mock_delete_pod.assert_not_called()
+ try:
+ key = ("dag_id", "task_id", "run_id", "try_number3")
+ executor._change_state(key, State.FAILED, "pod_id", "default")
+ assert executor.event_buffer[key][0] == State.FAILED
+ mock_delete_pod.assert_not_called()
+ finally:
+ executor.end()
@mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@@ -532,10 +558,13 @@ class TestKubernetesExecutor:
executor.kube_config.delete_worker_pods_on_failure = False
executor.start()
- key = ("dag_id", "task_id", "run_id", "try_number2")
- executor._change_state(key, State.SUCCESS, "pod_id", "default")
- assert executor.event_buffer[key][0] == State.SUCCESS
- mock_delete_pod.assert_not_called()
+ try:
+ key = ("dag_id", "task_id", "run_id", "try_number2")
+ executor._change_state(key, State.SUCCESS, "pod_id", "default")
+ assert executor.event_buffer[key][0] == State.SUCCESS
+ mock_delete_pod.assert_not_called()
+ finally:
+ executor.end()
@mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
@@ -547,10 +576,13 @@ class TestKubernetesExecutor:
executor.kube_config.delete_worker_pods_on_failure = True
executor.start()
- key = ("dag_id", "task_id", "run_id", "try_number2")
- executor._change_state(key, State.FAILED, "pod_id", "test-namespace")
- assert executor.event_buffer[key][0] == State.FAILED
- mock_delete_pod.assert_called_once_with("pod_id", "test-namespace")
+ try:
+ key = ("dag_id", "task_id", "run_id", "try_number2")
+ executor._change_state(key, State.FAILED, "pod_id", "test-namespace")
+ assert executor.event_buffer[key][0] == State.FAILED
+ mock_delete_pod.assert_called_once_with("pod_id", "test-namespace")
+ finally:
+ executor.end()
@mock.patch("airflow.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task")
@mock.patch("airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods")
@@ -789,8 +821,11 @@ class TestKubernetesExecutor:
executor = KubernetesExecutor()
executor.job_id = 123
executor.start()
- assert 2 == len(executor.event_scheduler.queue)
- executor._check_worker_pods_pending_timeout()
+ try:
+ assert 2 == len(executor.event_scheduler.queue)
+ executor._check_worker_pods_pending_timeout()
+ finally:
+ executor.end()
mock_kube_client.list_namespaced_pod.assert_called_once_with(
"mynamespace",
@@ -831,7 +866,10 @@ class TestKubernetesExecutor:
executor = KubernetesExecutor()
executor.job_id = 123
executor.start()
- executor._check_worker_pods_pending_timeout()
+ try:
+ executor._check_worker_pods_pending_timeout()
+ finally:
+ executor.end()
mock_kube_client.list_pod_for_all_namespaces.assert_called_once_with(
field_selector="status.phase=Pending",