You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2024/02/10 21:58:57 UTC

(airflow) branch main updated: The task is stuck in a queued state forever in case of pod launch errors (#36882)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e99487946a The task is stuck in a queued state forever in case of pod launch errors  (#36882)
e99487946a is described below

commit e99487946a39849e078b52ac5b4a226359978254
Author: Gopal Dirisala <39...@users.noreply.github.com>
AuthorDate: Sun Feb 11 03:28:50 2024 +0530

    The task is stuck in a queued state forever in case of pod launch errors  (#36882)
---
 airflow/providers/cncf/kubernetes/CHANGELOG.rst    |  11 ++
 .../kubernetes/executors/kubernetes_executor.py    |  34 ++--
 airflow/providers/cncf/kubernetes/provider.yaml    |   9 ++
 .../executors/test_kubernetes_executor.py          | 173 ++++++++++++++++++---
 4 files changed, 199 insertions(+), 28 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
index d4f7854aa3..c0e5c10f4e 100644
--- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst
+++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
@@ -52,6 +52,17 @@ Misc
 * ``Changing wording in docstring for CNCF provider (#36547)``
 * ``Add support of Pendulum 3 (#36281)``
 
+Breaking changes
+~~~~~~~~~~~~~~~~
+
+In the case of Kube API exceeded quota errors, we have introduced the ``task_publish_max_retries``
+flag to control the re-queuing task behavior. Changed the default behavior from unlimited
+retries to 0. The default behavior is no retries (``task_publish_max_retries==0``). For
+unlimited retries, set ``task_publish_max_retries=-1``. For a fixed number of retries, set
+``task_publish_max_retries`` to any positive integer.
+
+* ``Fix: The task is stuck in a queued state forever in case of pod launch errors (#36882)``
+
 .. Below changes are excluded from the changelog. Move them to
    appropriate section above if needed. Do not delete the lines(!):
    * ``Prepare docs 1st wave of Providers January 2024 (#36640)``
diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index a5d911f8ce..2209d72e01 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -28,7 +28,7 @@ import json
 import logging
 import multiprocessing
 import time
-from collections import defaultdict
+from collections import Counter, defaultdict
 from contextlib import suppress
 from datetime import datetime
 from queue import Empty, Queue
@@ -161,6 +161,8 @@ class KubernetesExecutor(BaseExecutor):
         self.event_scheduler: EventScheduler | None = None
         self.last_handled: dict[TaskInstanceKey, float] = {}
         self.kubernetes_queue: str | None = None
+        self.task_publish_retries: Counter[TaskInstanceKey] = Counter()
+        self.task_publish_max_retries = conf.getint("kubernetes", "task_publish_max_retries", fallback=0)
         super().__init__(parallelism=self.kube_config.parallelism)
 
     def _list_pods(self, query_kwargs):
@@ -425,7 +427,9 @@ class KubernetesExecutor(BaseExecutor):
                 task = self.task_queue.get_nowait()
 
                 try:
+                    key, command, kube_executor_config, pod_template_file = task
                     self.kube_scheduler.run_next(task)
+                    self.task_publish_retries.pop(key, None)
                 except PodReconciliationError as e:
                     self.log.error(
                         "Pod reconciliation failed, likely due to kubernetes library upgrade. "
@@ -434,19 +438,29 @@ class KubernetesExecutor(BaseExecutor):
                     )
                     self.fail(task[0], e)
                 except ApiException as e:
-                    # These codes indicate something is wrong with pod definition; otherwise we assume pod
-                    # definition is ok, and that retrying may work
-                    if e.status in (400, 422):
-                        self.log.error("Pod creation failed with reason %r. Failing task", e.reason)
-                        key, _, _, _ = task
-                        self.fail(key, e)
-                    else:
+                    body = json.loads(e.body)
+                    retries = self.task_publish_retries[key]
+                    # In case of exceeded quota errors, requeue the task as per the task_publish_max_retries
+                    if (
+                        e.status == 403
+                        and "exceeded quota" in body["message"]
+                        and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries)
+                    ):
                         self.log.warning(
-                            "ApiException when attempting to run task, re-queueing. Reason: %r. Message: %s",
+                            "[Try %s of %s] Kube ApiException for Task: (%s). Reason: %r. Message: %s",
+                            self.task_publish_retries[key] + 1,
+                            self.task_publish_max_retries,
+                            key,
                             e.reason,
-                            json.loads(e.body)["message"],
+                            body["message"],
                         )
                         self.task_queue.put(task)
+                        self.task_publish_retries[key] = retries + 1
+                    else:
+                        self.log.error("Pod creation failed with reason %r. Failing task", e.reason)
+                        key, _, _, _ = task
+                        self.fail(key, e)
+                        self.task_publish_retries.pop(key, None)
                 except PodMutationHookException as e:
                     key, _, _, _ = task
                     self.log.error(
diff --git a/airflow/providers/cncf/kubernetes/provider.yaml b/airflow/providers/cncf/kubernetes/provider.yaml
index 787d94828a..ecefe4e24e 100644
--- a/airflow/providers/cncf/kubernetes/provider.yaml
+++ b/airflow/providers/cncf/kubernetes/provider.yaml
@@ -350,6 +350,15 @@ config:
         type: string
         example: ~
         default: ""
+      task_publish_max_retries:
+        description: |
+          The Maximum number of retries for queuing the task to the kubernetes scheduler when
+          failing due to Kube API exceeded quota errors before giving up and marking task as failed.
+          -1 for unlimited times.
+        version_added: ~
+        type: integer
+        example: ~
+        default: "0"
 
 executors:
   - airflow.providers.cncf.kubernetes.kubernetes_executor.KubernetesExecutor
diff --git a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index 15b17515c1..b6c1a5224c 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -263,37 +263,170 @@ class TestKubernetesExecutor:
         AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
     )
     @pytest.mark.parametrize(
-        "status, should_requeue",
+        "response, task_publish_max_retries, should_requeue, task_expected_state",
         [
-            pytest.param(403, True, id="403 Forbidden"),
-            pytest.param(12345, True, id="12345 fake-unhandled-reason"),
-            pytest.param(422, False, id="422 Unprocessable Entity"),
-            pytest.param(400, False, id="400 BadRequest"),
+            pytest.param(
+                HTTPResponse(body='{"message": "any message"}', status=400),
+                0,
+                False,
+                State.FAILED,
+                id="400 BadRequest",
+            ),
+            pytest.param(
+                HTTPResponse(body='{"message": "any message"}', status=400),
+                1,
+                False,
+                State.FAILED,
+                id="400 BadRequest (task_publish_max_retries=1)",
+            ),
+            pytest.param(
+                HTTPResponse(body='{"message": "any message"}', status=400),
+                0,
+                False,
+                State.FAILED,
+                id="400 BadRequest",
+            ),
+            pytest.param(
+                HTTPResponse(body='{"message": "any message"}', status=400),
+                1,
+                False,
+                State.FAILED,
+                id="400 BadRequest (task_publish_max_retries=1)",
+            ),
+            pytest.param(
+                HTTPResponse(body='{"message": "any message"}', status=403),
+                0,
+                False,
+                State.FAILED,
+                id="403 Forbidden (permission denied)",
+            ),
+            pytest.param(
+                HTTPResponse(body='{"message": "any message"}', status=403),
+                1,
+                False,
+                State.FAILED,
+                id="403 Forbidden (permission denied) (task_publish_max_retries=1)",
+            ),
+            pytest.param(
+                HTTPResponse(
+                    body='{"message": "pods pod1 is forbidden: exceeded quota: '
+                    "resouece-quota, requested: pods=1, used: pods=10, "
+                    'limited: pods=10"}',
+                    status=403,
+                ),
+                0,
+                False,
+                State.FAILED,
+                id="403 Forbidden (exceeded quota)",
+            ),
+            pytest.param(
+                HTTPResponse(
+                    body='{"message": "pods pod1 is forbidden: exceeded quota: '
+                    "resouece-quota, requested: pods=1, used: pods=10, "
+                    'limited: pods=10"}',
+                    status=403,
+                ),
+                1,
+                True,
+                State.SUCCESS,
+                id="403 Forbidden (exceeded quota) (task_publish_max_retries=1) (retry succeeded)",
+            ),
+            pytest.param(
+                HTTPResponse(
+                    body='{"message": "pods pod1 is forbidden: exceeded quota: '
+                    "resouece-quota, requested: pods=1, used: pods=10, "
+                    'limited: pods=10"}',
+                    status=403,
+                ),
+                1,
+                True,
+                State.FAILED,
+                id="403 Forbidden (exceeded quota) (task_publish_max_retries=1)  (retry failed)",
+            ),
+            pytest.param(
+                HTTPResponse(body='{"message": "any message"}', status=404),
+                0,
+                False,
+                State.FAILED,
+                id="404 Not Found",
+            ),
+            pytest.param(
+                HTTPResponse(body='{"message": "any message"}', status=404),
+                1,
+                False,
+                State.FAILED,
+                id="404 Not Found (task_publish_max_retries=1)",
+            ),
+            pytest.param(
+                HTTPResponse(body='{"message": "any message"}', status=422),
+                0,
+                False,
+                State.FAILED,
+                id="422 Unprocessable Entity",
+            ),
+            pytest.param(
+                HTTPResponse(body='{"message": "any message"}', status=422),
+                1,
+                False,
+                State.FAILED,
+                id="422 Unprocessable Entity (task_publish_max_retries=1)",
+            ),
+            pytest.param(
+                HTTPResponse(body='{"message": "any message"}', status=12345),
+                0,
+                False,
+                State.FAILED,
+                id="12345 fake-unhandled-reason",
+            ),
+            pytest.param(
+                HTTPResponse(body='{"message": "any message"}', status=12345),
+                1,
+                False,
+                State.FAILED,
+                id="12345 fake-unhandled-reason (task_publish_max_retries=1) (retry succeeded)",
+            ),
+            pytest.param(
+                HTTPResponse(body='{"message": "any message"}', status=12345),
+                1,
+                False,
+                State.FAILED,
+                id="12345 fake-unhandled-reason (task_publish_max_retries=1) (retry failed)",
+            ),
         ],
     )
     @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
     @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
     def test_run_next_exception_requeue(
-        self, mock_get_kube_client, mock_kubernetes_job_watcher, status, should_requeue
+        self,
+        mock_get_kube_client,
+        mock_kubernetes_job_watcher,
+        response,
+        task_publish_max_retries,
+        should_requeue,
+        task_expected_state,
     ):
         """
-        When pod scheduling fails with either reason 'Forbidden', or any reason not yet
-        handled in the relevant try-except block, the task should stay in the ``task_queue``
-        and be attempted on a subsequent executor sync.  When reason is 'Unprocessable Entity'
-        or 'BadRequest', the task should be failed without being re-queued.
+        When pod scheduling fails with any reason not yet
+        handled in the relevant try-except block and task publish retries not exhausted, the task should stay
+        in the ``task_queue`` and be attempted on a subsequent executor sync.
+        When reason is 'Unprocessable Entity' or 'BadRequest' or task publish retries exhausted,
+        the task should be failed without being re-queued.
 
         Note on error scenarios:
 
-        - 403 Forbidden will be returned when your request exceeds namespace quota.
-        - 422 Unprocessable Entity is returned when your parameters are valid but unsupported
-            e.g. limits lower than requests.
-        - 400 BadRequest is returned when your parameters are invalid e.g. asking for cpu=100ABC123.
+        - 400 BadRequest will returns in scenarios like
+            - your request parameters are invalid e.g. asking for cpu=100ABC123.
+        - 403 Forbidden will returns in scenarios like
+            - your request exceeds the namespace quota
+            - scheduler role doesn't have permission to launch the pod
+        - 404 Not Found will returns in scenarios like
+            - your requested namespace doesn't exists
+        - 422 Unprocessable Entity will returns in scenarios like
+            - your request parameters are valid but unsupported e.g. limits lower than requests.
 
         """
         path = sys.path[0] + "/tests/providers/cncf/kubernetes/pod_generator_base_with_secrets.yaml"
 
-        response = HTTPResponse(body='{"message": "any message"}', status=status)
-
         # A mock kube_client that throws errors when making a pod
         mock_kube_client = mock.patch("kubernetes.client.CoreV1Api", autospec=True)
         mock_kube_client.create_namespaced_pod = mock.MagicMock(side_effect=ApiException(http_resp=response))
@@ -306,6 +439,7 @@ class TestKubernetesExecutor:
         }
         with conf_vars(config):
             kubernetes_executor = self.kubernetes_executor
+            kubernetes_executor.task_publish_max_retries = task_publish_max_retries
             kubernetes_executor.start()
             try:
                 # Execute a task while the Api Throws errors
@@ -324,16 +458,19 @@ class TestKubernetesExecutor:
                     assert not kubernetes_executor.task_queue.empty()
 
                     # Disable the ApiException
-                    mock_kube_client.create_namespaced_pod.side_effect = None
+                    if task_expected_state == State.SUCCESS:
+                        mock_kube_client.create_namespaced_pod.side_effect = None
 
                     # Execute the task without errors should empty the queue
                     mock_kube_client.create_namespaced_pod.reset_mock()
                     kubernetes_executor.sync()
                     assert mock_kube_client.create_namespaced_pod.called
                     assert kubernetes_executor.task_queue.empty()
+                    if task_expected_state != State.SUCCESS:
+                        assert kubernetes_executor.event_buffer[task_instance_key][0] == task_expected_state
                 else:
                     assert kubernetes_executor.task_queue.empty()
-                    assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
+                    assert kubernetes_executor.event_buffer[task_instance_key][0] == task_expected_state
             finally:
                 kubernetes_executor.end()