You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/04/26 21:19:18 UTC

[airflow] branch master updated: Fix Task Adoption in ``KubernetesExecutor`` (#14795)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 344e829  Fix Task Adoption in ``KubernetesExecutor`` (#14795)
344e829 is described below

commit 344e8296d7e8a9af61edd0aae4f7784ecf33e5a2
Author: atrbgithub <14...@users.noreply.github.com>
AuthorDate: Mon Apr 26 22:19:02 2021 +0100

    Fix Task Adoption in ``KubernetesExecutor`` (#14795)
    
    Ensure that we use ti.queued_by_job_id when searching for pods. The queued_by_job_id is used by
    adopt_launched_task when updating the labels. Without this, after restarting the scheduler
    a third time, the scheduler does not find the pods as it is still searching for the id of
    the original scheduler (ti.external_executor_id)
    
    Co-Authored-By: samwedge <19...@users.noreply.github.com>
    Co-Authored-By: philip-hope <64...@users.noreply.github.com>
    Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
---
 airflow/executors/kubernetes_executor.py    |  6 +--
 tests/executors/test_kubernetes_executor.py | 76 +++++++++++++++++++++++++++++
 2 files changed, 79 insertions(+), 3 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 4078976..a2ae701 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -651,15 +651,15 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
         self.event_buffer[key] = state, None
 
     def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:
-        tis_to_flush = [ti for ti in tis if not ti.external_executor_id]
-        scheduler_job_ids = [ti.external_executor_id for ti in tis]
+        tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
+        scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
         pod_ids = {
             create_pod_id(
                 dag_id=pod_generator.make_safe_label_value(ti.dag_id),
                 task_id=pod_generator.make_safe_label_value(ti.task_id),
             ): ti
             for ti in tis
-            if ti.external_executor_id
+            if ti.queued_by_job_id
         }
         kube_client: client.CoreV1Api = self.kube_client
         for scheduler_job_id in scheduler_job_ids:
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 193b25a..bd6b2cc 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -375,6 +375,82 @@ class TestKubernetesExecutor(unittest.TestCase):
         assert executor.event_buffer[key][0] == State.FAILED
         mock_delete_pod.assert_called_once_with('pod_id', 'test-namespace')
 
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task')
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods')
+    def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_launched_task):
+        executor = self.kubernetes_executor
+        executor.scheduler_job_id = "10"
+        mock_ti = mock.MagicMock(queued_by_job_id="1", external_executor_id="1", dag_id="dag", task_id="task")
+        pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="foo", labels={"dag_id": "dag", "task_id": "task"}))
+        pod_id = create_pod_id(dag_id="dag", task_id="task")
+        mock_kube_client = mock.MagicMock()
+        mock_kube_client.list_namespaced_pod.return_value.items = [pod]
+        executor.kube_client = mock_kube_client
+
+        # First adoption
+        executor.try_adopt_task_instances([mock_ti])
+        mock_kube_client.list_namespaced_pod.assert_called_once_with(
+            namespace='default', label_selector='airflow-worker=1'
+        )
+        mock_adopt_launched_task.assert_called_once_with(mock_kube_client, pod, {pod_id: mock_ti})
+        mock_adopt_completed_pods.assert_called_once()
+        # We aren't checking the return value of `try_adopt_task_instances` because it relies on
+        # `adopt_launched_task` mutating its arg. This should be refactored, but not right now.
+
+        # Second adoption (queued_by_job_id and external_executor_id no longer match)
+        mock_kube_client.reset_mock()
+        mock_adopt_launched_task.reset_mock()
+        mock_adopt_completed_pods.reset_mock()
+
+        mock_ti.queued_by_job_id = "10"  # scheduler_job would have updated this after the first adoption
+        executor.scheduler_job_id = "20"
+
+        executor.try_adopt_task_instances([mock_ti])
+        mock_kube_client.list_namespaced_pod.assert_called_once_with(
+            namespace='default', label_selector='airflow-worker=10'
+        )
+        mock_adopt_launched_task.assert_called_once_with(mock_kube_client, pod, {pod_id: mock_ti})
+        mock_adopt_completed_pods.assert_called_once()
+
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods')
+    def test_try_adopt_task_instances_multiple_scheduler_ids(self, mock_adopt_completed_pods):
+        """We try to find pods only once per scheduler id"""
+        executor = self.kubernetes_executor
+        mock_kube_client = mock.MagicMock()
+        executor.kube_client = mock_kube_client
+
+        mock_tis = [
+            mock.MagicMock(queued_by_job_id="10", external_executor_id="1", dag_id="dag", task_id="task"),
+            mock.MagicMock(queued_by_job_id="40", external_executor_id="1", dag_id="dag", task_id="task2"),
+            mock.MagicMock(queued_by_job_id="40", external_executor_id="1", dag_id="dag", task_id="task3"),
+        ]
+
+        executor.try_adopt_task_instances(mock_tis)
+        assert mock_kube_client.list_namespaced_pod.call_count == 2
+        mock_kube_client.list_namespaced_pod.assert_has_calls(
+            [
+                mock.call(namespace='default', label_selector='airflow-worker=10'),
+                mock.call(namespace='default', label_selector='airflow-worker=40'),
+            ],
+            any_order=True,
+        )
+
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task')
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods')
+    def test_try_adopt_task_instances_no_matching_pods(
+        self, mock_adopt_completed_pods, mock_adopt_launched_task
+    ):
+        executor = self.kubernetes_executor
+        mock_ti = mock.MagicMock(queued_by_job_id="1", external_executor_id="1", dag_id="dag", task_id="task")
+        mock_kube_client = mock.MagicMock()
+        mock_kube_client.list_namespaced_pod.return_value.items = []
+        executor.kube_client = mock_kube_client
+
+        tis_to_flush = executor.try_adopt_task_instances([mock_ti])
+        assert tis_to_flush == [mock_ti]
+        mock_adopt_launched_task.assert_not_called()
+        mock_adopt_completed_pods.assert_called_once()
+
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     def test_adopt_launched_task(self, mock_kube_client):
         executor = self.kubernetes_executor