You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:47:03 UTC

[airflow] 19/37: Be more selective when adopting pods with KubernetesExecutor (#28899)

This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5a8ddaedd80638f9066f5af30c5695ba43ee9214
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Wed Jan 18 14:05:50 2023 -0600

    Be more selective when adopting pods with KubernetesExecutor (#28899)
    
    * Be more selective when adopting pods with KubernetesExecutor
    
    When trying to adopt "resettable" TIs from SchedulerJob, we should not
    list out all the pods to compare against, only those that didn't
    succeed. This means we will get any pods that are still starting,
    running, or failed (meaning the TI wasn't moved to a terminal state
    there, and will be in out "adoptable" list).
    
    This avoids the scenario where a dead scheduler has both a completed,
    successful worker, and a still running worker, causing log lines
    like these about the successful one:
    
        ERROR - attempting to adopt taskinstance which was not specified by
        database: TaskInstanceKey(...)
    
    This also makes sure we only find pods with the
    `kubernetes_executor=True` label for extra safety.
    
    Closes #28071
    
    * Also ignore done pods
    
    (cherry picked from commit f64ac5978fb3dfa9e40a0e5190ef88e9f9615824)
---
 airflow/executors/kubernetes_executor.py    | 32 +++++++++++++++++++++--------
 tests/executors/test_kubernetes_executor.py | 20 ++++++++++++++----
 2 files changed, 39 insertions(+), 13 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 34204539fa..412caea5c5 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -468,13 +468,16 @@ class KubernetesExecutor(BaseExecutor):
     @provide_session
     def clear_not_launched_queued_tasks(self, session=None) -> None:
         """
-        Tasks can end up in a "Queued" state through either the executor being
-        abruptly shut down (leaving a non-empty task_queue on this executor)
-        or when a rescheduled/deferred operator comes back up for execution
-        (with the same try_number) before the pod of its previous incarnation
-        has been fully removed (we think).
+        Clear tasks that were not yet launched, but were previously queued.
 
-        This method checks each of those tasks to see if the corresponding pod
+        Tasks can end up in a "Queued" state through when a rescheduled/deferred
+        operator comes back up for execution (with the same try_number) before the
+        pod of its previous incarnation has been fully removed (we think).
+
+        It's also possible when an executor abruptly shuts down (leaving a non-empty
+        task_queue on that executor), but that scenario is handled via normal adoption.
+
+        This method checks each of our queued tasks to see if the corresponding pod
         is around, and if not, and there's no matching entry in our own
         task_queue, marks it for re-execution.
         """
@@ -747,9 +750,20 @@ class KubernetesExecutor(BaseExecutor):
         kube_client: client.CoreV1Api = self.kube_client
         for scheduler_job_id in scheduler_job_ids:
             scheduler_job_id = pod_generator.make_safe_label_value(str(scheduler_job_id))
-            kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"}
-            pod_list = kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, **kwargs)
-            for pod in pod_list.items:
+            # We will look for any pods owned by the no-longer-running scheduler,
+            # but will exclude only successful pods, as those TIs will have a terminal state
+            # and not be up for adoption!
+            # Those workers that failed, however, are okay to adopt here as their TI will
+            # still be in queued.
+            query_kwargs = {
+                "field_selector": "status.phase!=Succeeded",
+                "label_selector": (
+                    "kubernetes_executor=True,"
+                    f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}!=True"
+                ),
+            }
+            pod_list = self._list_pods(query_kwargs)
+            for pod in pod_list:
                 self.adopt_launched_task(kube_client, pod, pod_ids)
         self._adopt_completed_pods(kube_client)
         tis_to_flush.extend(pod_ids.values())
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index d24f3e8fd9..a56b6c9639 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -622,7 +622,9 @@ class TestKubernetesExecutor:
         # First adoption
         reset_tis = executor.try_adopt_task_instances([mock_ti])
         mock_kube_client.list_namespaced_pod.assert_called_once_with(
-            namespace="default", label_selector="airflow-worker=1"
+            namespace="default",
+            field_selector="status.phase!=Succeeded",
+            label_selector="kubernetes_executor=True,airflow-worker=1,airflow_executor_done!=True",
         )
         mock_adopt_launched_task.assert_called_once_with(mock_kube_client, pod, {ti_key: mock_ti})
         mock_adopt_completed_pods.assert_called_once()
@@ -640,7 +642,9 @@ class TestKubernetesExecutor:
 
         reset_tis = executor.try_adopt_task_instances([mock_ti])
         mock_kube_client.list_namespaced_pod.assert_called_once_with(
-            namespace="default", label_selector="airflow-worker=10"
+            namespace="default",
+            field_selector="status.phase!=Succeeded",
+            label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True",
         )
         mock_adopt_launched_task.assert_called_once()  # Won't check args this time around as they get mutated
         mock_adopt_completed_pods.assert_called_once()
@@ -663,8 +667,16 @@ class TestKubernetesExecutor:
         assert mock_kube_client.list_namespaced_pod.call_count == 2
         mock_kube_client.list_namespaced_pod.assert_has_calls(
             [
-                mock.call(namespace="default", label_selector="airflow-worker=10"),
-                mock.call(namespace="default", label_selector="airflow-worker=40"),
+                mock.call(
+                    namespace="default",
+                    field_selector="status.phase!=Succeeded",
+                    label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True",
+                ),
+                mock.call(
+                    namespace="default",
+                    field_selector="status.phase!=Succeeded",
+                    label_selector="kubernetes_executor=True,airflow-worker=40,airflow_executor_done!=True",
+                ),
             ],
             any_order=True,
         )