You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/04/26 21:19:18 UTC
[airflow] branch master updated: Fix Task Adoption in
``KubernetesExecutor`` (#14795)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 344e829 Fix Task Adoption in ``KubernetesExecutor`` (#14795)
344e829 is described below
commit 344e8296d7e8a9af61edd0aae4f7784ecf33e5a2
Author: atrbgithub <14...@users.noreply.github.com>
AuthorDate: Mon Apr 26 22:19:02 2021 +0100
Fix Task Adoption in ``KubernetesExecutor`` (#14795)
Ensure that we use ti.queued_by_job_id when searching for pods. The queued_by_job_id is used by
adopt_launched_task when updating the labels. Without this, after restarting the scheduler
a third time, the scheduler does not find the pods as it is still searching for the id of
the original scheduler (ti.external_executor_id)
Co-Authored-By: samwedge <19...@users.noreply.github.com>
Co-Authored-By: philip-hope <64...@users.noreply.github.com>
Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
---
airflow/executors/kubernetes_executor.py | 6 +--
tests/executors/test_kubernetes_executor.py | 76 +++++++++++++++++++++++++++++
2 files changed, 79 insertions(+), 3 deletions(-)
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 4078976..a2ae701 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -651,15 +651,15 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
self.event_buffer[key] = state, None
def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:
- tis_to_flush = [ti for ti in tis if not ti.external_executor_id]
- scheduler_job_ids = [ti.external_executor_id for ti in tis]
+ tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
+ scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
pod_ids = {
create_pod_id(
dag_id=pod_generator.make_safe_label_value(ti.dag_id),
task_id=pod_generator.make_safe_label_value(ti.task_id),
): ti
for ti in tis
- if ti.external_executor_id
+ if ti.queued_by_job_id
}
kube_client: client.CoreV1Api = self.kube_client
for scheduler_job_id in scheduler_job_ids:
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 193b25a..bd6b2cc 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -375,6 +375,82 @@ class TestKubernetesExecutor(unittest.TestCase):
assert executor.event_buffer[key][0] == State.FAILED
mock_delete_pod.assert_called_once_with('pod_id', 'test-namespace')
+ @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task')
+ @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods')
+ def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_launched_task):
+ executor = self.kubernetes_executor
+ executor.scheduler_job_id = "10"
+ mock_ti = mock.MagicMock(queued_by_job_id="1", external_executor_id="1", dag_id="dag", task_id="task")
+ pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="foo", labels={"dag_id": "dag", "task_id": "task"}))
+ pod_id = create_pod_id(dag_id="dag", task_id="task")
+ mock_kube_client = mock.MagicMock()
+ mock_kube_client.list_namespaced_pod.return_value.items = [pod]
+ executor.kube_client = mock_kube_client
+
+ # First adoption
+ executor.try_adopt_task_instances([mock_ti])
+ mock_kube_client.list_namespaced_pod.assert_called_once_with(
+ namespace='default', label_selector='airflow-worker=1'
+ )
+ mock_adopt_launched_task.assert_called_once_with(mock_kube_client, pod, {pod_id: mock_ti})
+ mock_adopt_completed_pods.assert_called_once()
+ # We aren't checking the return value of `try_adopt_task_instances` because it relies on
+ # `adopt_launched_task` mutating its arg. This should be refactored, but not right now.
+
+ # Second adoption (queued_by_job_id and external_executor_id no longer match)
+ mock_kube_client.reset_mock()
+ mock_adopt_launched_task.reset_mock()
+ mock_adopt_completed_pods.reset_mock()
+
+ mock_ti.queued_by_job_id = "10" # scheduler_job would have updated this after the first adoption
+ executor.scheduler_job_id = "20"
+
+ executor.try_adopt_task_instances([mock_ti])
+ mock_kube_client.list_namespaced_pod.assert_called_once_with(
+ namespace='default', label_selector='airflow-worker=10'
+ )
+ mock_adopt_launched_task.assert_called_once_with(mock_kube_client, pod, {pod_id: mock_ti})
+ mock_adopt_completed_pods.assert_called_once()
+
+ @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods')
+ def test_try_adopt_task_instances_multiple_scheduler_ids(self, mock_adopt_completed_pods):
+ """We try to find pods only once per scheduler id"""
+ executor = self.kubernetes_executor
+ mock_kube_client = mock.MagicMock()
+ executor.kube_client = mock_kube_client
+
+ mock_tis = [
+ mock.MagicMock(queued_by_job_id="10", external_executor_id="1", dag_id="dag", task_id="task"),
+ mock.MagicMock(queued_by_job_id="40", external_executor_id="1", dag_id="dag", task_id="task2"),
+ mock.MagicMock(queued_by_job_id="40", external_executor_id="1", dag_id="dag", task_id="task3"),
+ ]
+
+ executor.try_adopt_task_instances(mock_tis)
+ assert mock_kube_client.list_namespaced_pod.call_count == 2
+ mock_kube_client.list_namespaced_pod.assert_has_calls(
+ [
+ mock.call(namespace='default', label_selector='airflow-worker=10'),
+ mock.call(namespace='default', label_selector='airflow-worker=40'),
+ ],
+ any_order=True,
+ )
+
+ @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task')
+ @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods')
+ def test_try_adopt_task_instances_no_matching_pods(
+ self, mock_adopt_completed_pods, mock_adopt_launched_task
+ ):
+ executor = self.kubernetes_executor
+ mock_ti = mock.MagicMock(queued_by_job_id="1", external_executor_id="1", dag_id="dag", task_id="task")
+ mock_kube_client = mock.MagicMock()
+ mock_kube_client.list_namespaced_pod.return_value.items = []
+ executor.kube_client = mock_kube_client
+
+ tis_to_flush = executor.try_adopt_task_instances([mock_ti])
+ assert tis_to_flush == [mock_ti]
+ mock_adopt_launched_task.assert_not_called()
+ mock_adopt_completed_pods.assert_called_once()
+
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
def test_adopt_launched_task(self, mock_kube_client):
executor = self.kubernetes_executor