You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2021/11/04 19:54:13 UTC
[airflow] branch v2-2-test updated: Task should fail immediately
when pod is unprocessable (#19359)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-2-test by this push:
new 8dc9541 Task should fail immediately when pod is unprocessable (#19359)
8dc9541 is described below
commit 8dc95415c68a4cef36eb4e02b5558288981891e2
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Wed Nov 3 17:22:55 2021 -0700
Task should fail immediately when pod is unprocessable (#19359)
When pod has invalid requirements, e.g. resource limit < resource request,
the kubernetes api may return "Unprocessable Entity". In this scenario,
the kubernetes executor should fail the task immediately, rather than set
it to be attempted again
closes https://github.com/apache/airflow/issues/19320
(cherry picked from commit eb12bb2f0418120be31cbcd8e8722528af9eb344)
---
airflow/executors/kubernetes_executor.py | 10 ++--
tests/executors/test_kubernetes_executor.py | 75 ++++++++++++++++++++---------
2 files changed, 58 insertions(+), 27 deletions(-)
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index d1b60cd..ab37c6d 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -598,13 +598,17 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
try:
self.kube_scheduler.run_next(task)
except ApiException as e:
- if e.reason == "BadRequest":
- self.log.error("Request was invalid. Failing task")
+
+ # These codes indicate something is wrong with pod definition; otherwise we assume pod
+ # definition is ok, and that retrying may work
+ if e.status in (400, 422):
+ self.log.error("Pod creation failed with reason %r. Failing task", e.reason)
key, _, _, _ = task
self.change_state(key, State.FAILED, e)
else:
self.log.warning(
- 'ApiException when attempting to run task, re-queueing. Message: %s',
+ 'ApiException when attempting to run task, re-queueing. Reason: %r. Message: %s',
+ e.reason,
json.loads(e.body)['message'],
)
self.task_queue.put(task)
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index e7911ce..cd72b06 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -182,33 +182,52 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
"""
Tests if an ApiException from the Kube Client will cause the task to
be rescheduled.
"""
- def setUp(self) -> None:
+ def setup_method(self) -> None:
self.kubernetes_executor = KubernetesExecutor()
self.kubernetes_executor.job_id = "5"
- @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+ @pytest.mark.skipif(
+ AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+ )
+ @pytest.mark.parametrize(
+ 'status, should_requeue',
+ [
+ pytest.param(403, True, id='403 Forbidden'),
+ pytest.param(12345, True, id='12345 fake-unhandled-reason'),
+ pytest.param(422, False, id='422 Unprocessable Entity'),
+ pytest.param(400, False, id='400 BadRequest'),
+ ],
+ )
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
- def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+ def test_run_next_exception_requeue(
+ self, mock_get_kube_client, mock_kubernetes_job_watcher, status, should_requeue
+ ):
+ """
+ When pod scheduling fails with either reason 'Forbidden', or any reason not yet
+ handled in the relevant try-except block, the task should stay in the ``task_queue``
+ and be attempted on a subsequent executor sync. When reason is 'Unprocessable Entity'
+ or 'BadRequest', the task should be failed without being re-queued.
+
+ Note on error scenarios:
+
+ - 403 Forbidden will be returned when your request exceeds namespace quota.
+ - 422 Unprocessable Entity is returned when your parameters are valid but unsupported
+ e.g. limits lower than requests.
+ - 400 BadRequest is returned when your parameters are invalid e.g. asking for cpu=100ABC123.
+
+ """
import sys
path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml'
- # When a quota is exceeded this is the ApiException we get
- response = HTTPResponse(
- body='{"kind": "Status", "apiVersion": "v1", "metadata": {}, "status": "Failure", '
- '"message": "pods \\"podname\\" is forbidden: exceeded quota: compute-resources, '
- 'requested: limits.memory=4Gi, used: limits.memory=6508Mi, limited: limits.memory=10Gi", '
- '"reason": "Forbidden", "details": {"name": "podname", "kind": "pods"}, "code": 403}'
- )
- response.status = 403
- response.reason = "Forbidden"
+ response = HTTPResponse(body='{"message": "any message"}', status=status)
# A mock kube_client that throws errors when making a pod
mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
@@ -225,24 +244,30 @@ class TestKubernetesExecutor(unittest.TestCase):
kubernetes_executor.start()
# Execute a task while the Api Throws errors
try_number = 1
+ task_instance_key = ('dag', 'task', 'run_id', try_number)
kubernetes_executor.execute_async(
- key=('dag', 'task', 'run_id', try_number),
+ key=task_instance_key,
queue=None,
command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
)
kubernetes_executor.sync()
- kubernetes_executor.sync()
- assert mock_kube_client.create_namespaced_pod.called
- assert not kubernetes_executor.task_queue.empty()
+ assert mock_kube_client.create_namespaced_pod.call_count == 1
- # Disable the ApiException
- mock_kube_client.create_namespaced_pod.side_effect = None
+ if should_requeue:
+ assert not kubernetes_executor.task_queue.empty()
- # Execute the task without errors should empty the queue
- kubernetes_executor.sync()
- assert mock_kube_client.create_namespaced_pod.called
- assert kubernetes_executor.task_queue.empty()
+ # Disable the ApiException
+ mock_kube_client.create_namespaced_pod.side_effect = None
+
+ # Execute the task without errors should empty the queue
+ mock_kube_client.create_namespaced_pod.reset_mock()
+ kubernetes_executor.sync()
+ assert mock_kube_client.create_namespaced_pod.called
+ assert kubernetes_executor.task_queue.empty()
+ else:
+ assert kubernetes_executor.task_queue.empty()
+ assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
@mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.sync')
@@ -279,7 +304,9 @@ class TestKubernetesExecutor(unittest.TestCase):
assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"
@pytest.mark.execution_timeout(10)
- @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+ @pytest.mark.skipif(
+ AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+ )
@mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.run_pod_async')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
def test_pod_template_file_override_in_executor_config(self, mock_get_kube_client, mock_run_pod_async):