You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2024/02/10 21:58:57 UTC
(airflow) branch main updated: The task is stuck in a queued state forever in case of pod launch errors (#36882)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e99487946a The task is stuck in a queued state forever in case of pod launch errors (#36882)
e99487946a is described below
commit e99487946a39849e078b52ac5b4a226359978254
Author: Gopal Dirisala <39...@users.noreply.github.com>
AuthorDate: Sun Feb 11 03:28:50 2024 +0530
The task is stuck in a queued state forever in case of pod launch errors (#36882)
---
airflow/providers/cncf/kubernetes/CHANGELOG.rst | 11 ++
.../kubernetes/executors/kubernetes_executor.py | 34 ++--
airflow/providers/cncf/kubernetes/provider.yaml | 9 ++
.../executors/test_kubernetes_executor.py | 173 ++++++++++++++++++---
4 files changed, 199 insertions(+), 28 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
index d4f7854aa3..c0e5c10f4e 100644
--- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst
+++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
@@ -52,6 +52,17 @@ Misc
* ``Changing wording in docstring for CNCF provider (#36547)``
* ``Add support of Pendulum 3 (#36281)``
+Breaking changes
+~~~~~~~~~~~~~~~~
+
+In the case of Kube API exceeded quota errors, we have introduced the ``task_publish_max_retries``
+flag to control the re-queuing task behavior. Changed the default behavior from unlimited
+retries to 0. The default behavior is no retries (``task_publish_max_retries==0``). For
+unlimited retries, set ``task_publish_max_retries=-1``. For a fixed number of retries, set
+``task_publish_max_retries`` to any positive integer.
+
+* ``Fix: The task is stuck in a queued state forever in case of pod launch errors (#36882)``
+
.. Below changes are excluded from the changelog. Move them to
appropriate section above if needed. Do not delete the lines(!):
* ``Prepare docs 1st wave of Providers January 2024 (#36640)``
diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index a5d911f8ce..2209d72e01 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -28,7 +28,7 @@ import json
import logging
import multiprocessing
import time
-from collections import defaultdict
+from collections import Counter, defaultdict
from contextlib import suppress
from datetime import datetime
from queue import Empty, Queue
@@ -161,6 +161,8 @@ class KubernetesExecutor(BaseExecutor):
self.event_scheduler: EventScheduler | None = None
self.last_handled: dict[TaskInstanceKey, float] = {}
self.kubernetes_queue: str | None = None
+ self.task_publish_retries: Counter[TaskInstanceKey] = Counter()
+ self.task_publish_max_retries = conf.getint("kubernetes", "task_publish_max_retries", fallback=0)
super().__init__(parallelism=self.kube_config.parallelism)
def _list_pods(self, query_kwargs):
@@ -425,7 +427,9 @@ class KubernetesExecutor(BaseExecutor):
task = self.task_queue.get_nowait()
try:
+ key, command, kube_executor_config, pod_template_file = task
self.kube_scheduler.run_next(task)
+ self.task_publish_retries.pop(key, None)
except PodReconciliationError as e:
self.log.error(
"Pod reconciliation failed, likely due to kubernetes library upgrade. "
@@ -434,19 +438,29 @@ class KubernetesExecutor(BaseExecutor):
)
self.fail(task[0], e)
except ApiException as e:
- # These codes indicate something is wrong with pod definition; otherwise we assume pod
- # definition is ok, and that retrying may work
- if e.status in (400, 422):
- self.log.error("Pod creation failed with reason %r. Failing task", e.reason)
- key, _, _, _ = task
- self.fail(key, e)
- else:
+ body = json.loads(e.body)
+ retries = self.task_publish_retries[key]
+ # In case of exceeded quota errors, requeue the task as per the task_publish_max_retries
+ if (
+ e.status == 403
+ and "exceeded quota" in body["message"]
+ and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries)
+ ):
self.log.warning(
- "ApiException when attempting to run task, re-queueing. Reason: %r. Message: %s",
+ "[Try %s of %s] Kube ApiException for Task: (%s). Reason: %r. Message: %s",
+ self.task_publish_retries[key] + 1,
+ self.task_publish_max_retries,
+ key,
e.reason,
- json.loads(e.body)["message"],
+ body["message"],
)
self.task_queue.put(task)
+ self.task_publish_retries[key] = retries + 1
+ else:
+ self.log.error("Pod creation failed with reason %r. Failing task", e.reason)
+ key, _, _, _ = task
+ self.fail(key, e)
+ self.task_publish_retries.pop(key, None)
except PodMutationHookException as e:
key, _, _, _ = task
self.log.error(
diff --git a/airflow/providers/cncf/kubernetes/provider.yaml b/airflow/providers/cncf/kubernetes/provider.yaml
index 787d94828a..ecefe4e24e 100644
--- a/airflow/providers/cncf/kubernetes/provider.yaml
+++ b/airflow/providers/cncf/kubernetes/provider.yaml
@@ -350,6 +350,15 @@ config:
type: string
example: ~
default: ""
+ task_publish_max_retries:
+ description: |
+ The Maximum number of retries for queuing the task to the kubernetes scheduler when
+ failing due to Kube API exceeded quota errors before giving up and marking task as failed.
+ -1 for unlimited times.
+ version_added: ~
+ type: integer
+ example: ~
+ default: "0"
executors:
- airflow.providers.cncf.kubernetes.kubernetes_executor.KubernetesExecutor
diff --git a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index 15b17515c1..b6c1a5224c 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -263,37 +263,170 @@ class TestKubernetesExecutor:
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
)
@pytest.mark.parametrize(
- "status, should_requeue",
+ "response, task_publish_max_retries, should_requeue, task_expected_state",
[
- pytest.param(403, True, id="403 Forbidden"),
- pytest.param(12345, True, id="12345 fake-unhandled-reason"),
- pytest.param(422, False, id="422 Unprocessable Entity"),
- pytest.param(400, False, id="400 BadRequest"),
+ pytest.param(
+ HTTPResponse(body='{"message": "any message"}', status=400),
+ 0,
+ False,
+ State.FAILED,
+ id="400 BadRequest",
+ ),
+ pytest.param(
+ HTTPResponse(body='{"message": "any message"}', status=400),
+ 1,
+ False,
+ State.FAILED,
+ id="400 BadRequest (task_publish_max_retries=1)",
+ ),
+ pytest.param(
+ HTTPResponse(body='{"message": "any message"}', status=400),
+ 0,
+ False,
+ State.FAILED,
+ id="400 BadRequest",
+ ),
+ pytest.param(
+ HTTPResponse(body='{"message": "any message"}', status=400),
+ 1,
+ False,
+ State.FAILED,
+ id="400 BadRequest (task_publish_max_retries=1)",
+ ),
+ pytest.param(
+ HTTPResponse(body='{"message": "any message"}', status=403),
+ 0,
+ False,
+ State.FAILED,
+ id="403 Forbidden (permission denied)",
+ ),
+ pytest.param(
+ HTTPResponse(body='{"message": "any message"}', status=403),
+ 1,
+ False,
+ State.FAILED,
+ id="403 Forbidden (permission denied) (task_publish_max_retries=1)",
+ ),
+ pytest.param(
+ HTTPResponse(
+ body='{"message": "pods pod1 is forbidden: exceeded quota: '
+ "resouece-quota, requested: pods=1, used: pods=10, "
+ 'limited: pods=10"}',
+ status=403,
+ ),
+ 0,
+ False,
+ State.FAILED,
+ id="403 Forbidden (exceeded quota)",
+ ),
+ pytest.param(
+ HTTPResponse(
+ body='{"message": "pods pod1 is forbidden: exceeded quota: '
+ "resouece-quota, requested: pods=1, used: pods=10, "
+ 'limited: pods=10"}',
+ status=403,
+ ),
+ 1,
+ True,
+ State.SUCCESS,
+ id="403 Forbidden (exceeded quota) (task_publish_max_retries=1) (retry succeeded)",
+ ),
+ pytest.param(
+ HTTPResponse(
+ body='{"message": "pods pod1 is forbidden: exceeded quota: '
+ "resouece-quota, requested: pods=1, used: pods=10, "
+ 'limited: pods=10"}',
+ status=403,
+ ),
+ 1,
+ True,
+ State.FAILED,
+ id="403 Forbidden (exceeded quota) (task_publish_max_retries=1) (retry failed)",
+ ),
+ pytest.param(
+ HTTPResponse(body='{"message": "any message"}', status=404),
+ 0,
+ False,
+ State.FAILED,
+ id="404 Not Found",
+ ),
+ pytest.param(
+ HTTPResponse(body='{"message": "any message"}', status=404),
+ 1,
+ False,
+ State.FAILED,
+ id="404 Not Found (task_publish_max_retries=1)",
+ ),
+ pytest.param(
+ HTTPResponse(body='{"message": "any message"}', status=422),
+ 0,
+ False,
+ State.FAILED,
+ id="422 Unprocessable Entity",
+ ),
+ pytest.param(
+ HTTPResponse(body='{"message": "any message"}', status=422),
+ 1,
+ False,
+ State.FAILED,
+ id="422 Unprocessable Entity (task_publish_max_retries=1)",
+ ),
+ pytest.param(
+ HTTPResponse(body='{"message": "any message"}', status=12345),
+ 0,
+ False,
+ State.FAILED,
+ id="12345 fake-unhandled-reason",
+ ),
+ pytest.param(
+ HTTPResponse(body='{"message": "any message"}', status=12345),
+ 1,
+ False,
+ State.FAILED,
+ id="12345 fake-unhandled-reason (task_publish_max_retries=1) (retry succeeded)",
+ ),
+ pytest.param(
+ HTTPResponse(body='{"message": "any message"}', status=12345),
+ 1,
+ False,
+ State.FAILED,
+ id="12345 fake-unhandled-reason (task_publish_max_retries=1) (retry failed)",
+ ),
],
)
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_run_next_exception_requeue(
- self, mock_get_kube_client, mock_kubernetes_job_watcher, status, should_requeue
+ self,
+ mock_get_kube_client,
+ mock_kubernetes_job_watcher,
+ response,
+ task_publish_max_retries,
+ should_requeue,
+ task_expected_state,
):
"""
- When pod scheduling fails with either reason 'Forbidden', or any reason not yet
- handled in the relevant try-except block, the task should stay in the ``task_queue``
- and be attempted on a subsequent executor sync. When reason is 'Unprocessable Entity'
- or 'BadRequest', the task should be failed without being re-queued.
+ When pod scheduling fails with any reason not yet
+ handled in the relevant try-except block and task publish retries not exhausted, the task should stay
+ in the ``task_queue`` and be attempted on a subsequent executor sync.
+ When reason is 'Unprocessable Entity' or 'BadRequest' or task publish retries exhausted,
+ the task should be failed without being re-queued.
Note on error scenarios:
- - 403 Forbidden will be returned when your request exceeds namespace quota.
- - 422 Unprocessable Entity is returned when your parameters are valid but unsupported
- e.g. limits lower than requests.
- - 400 BadRequest is returned when your parameters are invalid e.g. asking for cpu=100ABC123.
+ - 400 BadRequest will returns in scenarios like
+ - your request parameters are invalid e.g. asking for cpu=100ABC123.
+ - 403 Forbidden will returns in scenarios like
+ - your request exceeds the namespace quota
+ - scheduler role doesn't have permission to launch the pod
+ - 404 Not Found will returns in scenarios like
+ - your requested namespace doesn't exists
+ - 422 Unprocessable Entity will returns in scenarios like
+ - your request parameters are valid but unsupported e.g. limits lower than requests.
"""
path = sys.path[0] + "/tests/providers/cncf/kubernetes/pod_generator_base_with_secrets.yaml"
- response = HTTPResponse(body='{"message": "any message"}', status=status)
-
# A mock kube_client that throws errors when making a pod
mock_kube_client = mock.patch("kubernetes.client.CoreV1Api", autospec=True)
mock_kube_client.create_namespaced_pod = mock.MagicMock(side_effect=ApiException(http_resp=response))
@@ -306,6 +439,7 @@ class TestKubernetesExecutor:
}
with conf_vars(config):
kubernetes_executor = self.kubernetes_executor
+ kubernetes_executor.task_publish_max_retries = task_publish_max_retries
kubernetes_executor.start()
try:
# Execute a task while the Api Throws errors
@@ -324,16 +458,19 @@ class TestKubernetesExecutor:
assert not kubernetes_executor.task_queue.empty()
# Disable the ApiException
- mock_kube_client.create_namespaced_pod.side_effect = None
+ if task_expected_state == State.SUCCESS:
+ mock_kube_client.create_namespaced_pod.side_effect = None
# Execute the task without errors should empty the queue
mock_kube_client.create_namespaced_pod.reset_mock()
kubernetes_executor.sync()
assert mock_kube_client.create_namespaced_pod.called
assert kubernetes_executor.task_queue.empty()
+ if task_expected_state != State.SUCCESS:
+ assert kubernetes_executor.event_buffer[task_instance_key][0] == task_expected_state
else:
assert kubernetes_executor.task_queue.empty()
- assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
+ assert kubernetes_executor.event_buffer[task_instance_key][0] == task_expected_state
finally:
kubernetes_executor.end()