You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2021/11/04 19:54:13 UTC

[airflow] branch v2-2-test updated: Task should fail immediately when pod is unprocessable (#19359)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-2-test by this push:
     new 8dc9541  Task should fail immediately when pod is unprocessable (#19359)
8dc9541 is described below

commit 8dc95415c68a4cef36eb4e02b5558288981891e2
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Wed Nov 3 17:22:55 2021 -0700

    Task should fail immediately when pod is unprocessable (#19359)
    
    When pod has invalid requirements, e.g. resource limit < resource request,
    the kubernetes api may return "Unprocessable Entity".  In this scenario,
    the kubernetes executor should fail the task immediately, rather than set
    it to be attempted again
    
    closes https://github.com/apache/airflow/issues/19320
    
    (cherry picked from commit eb12bb2f0418120be31cbcd8e8722528af9eb344)
---
 airflow/executors/kubernetes_executor.py    | 10 ++--
 tests/executors/test_kubernetes_executor.py | 75 ++++++++++++++++++++---------
 2 files changed, 58 insertions(+), 27 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index d1b60cd..ab37c6d 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -598,13 +598,17 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+
+                    # These codes indicate something is wrong with pod definition; otherwise we assume pod
+                    # definition is ok, and that retrying may work
+                    if e.status in (400, 422):
+                        self.log.error("Pod creation failed with reason %r. Failing task", e.reason)
                         key, _, _, _ = task
                         self.change_state(key, State.FAILED, e)
                     else:
                         self.log.warning(
-                            'ApiException when attempting to run task, re-queueing. Message: %s',
+                            'ApiException when attempting to run task, re-queueing. Reason: %r. Message: %s',
+                            e.reason,
                             json.loads(e.body)['message'],
                         )
                         self.task_queue.put(task)
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index e7911ce..cd72b06 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -182,33 +182,52 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'status, should_requeue',
+        [
+            pytest.param(403, True, id='403 Forbidden'),
+            pytest.param(12345, True, id='12345 fake-unhandled-reason'),
+            pytest.param(422, False, id='422 Unprocessable Entity'),
+            pytest.param(400, False, id='400 BadRequest'),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, status, should_requeue
+    ):
+        """
+        When pod scheduling fails with either reason 'Forbidden', or any reason not yet
+        handled in the relevant try-except block, the task should stay in the ``task_queue``
+        and be attempted on a subsequent executor sync.  When reason is 'Unprocessable Entity'
+        or 'BadRequest', the task should be failed without being re-queued.
+
+        Note on error scenarios:
+
+        - 403 Forbidden will be returned when your request exceeds namespace quota.
+        - 422 Unprocessable Entity is returned when your parameters are valid but unsupported
+            e.g. limits lower than requests.
+        - 400 BadRequest is returned when your parameters are invalid e.g. asking for cpu=100ABC123.
+
+        """
         import sys
 
         path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml'
 
-        # When a quota is exceeded this is the ApiException we get
-        response = HTTPResponse(
-            body='{"kind": "Status", "apiVersion": "v1", "metadata": {}, "status": "Failure", '
-            '"message": "pods \\"podname\\" is forbidden: exceeded quota: compute-resources, '
-            'requested: limits.memory=4Gi, used: limits.memory=6508Mi, limited: limits.memory=10Gi", '
-            '"reason": "Forbidden", "details": {"name": "podname", "kind": "pods"}, "code": 403}'
-        )
-        response.status = 403
-        response.reason = "Forbidden"
+        response = HTTPResponse(body='{"message": "any message"}', status=status)
 
         # A mock kube_client that throws errors when making a pod
         mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)
@@ -225,24 +244,30 @@ class TestKubernetesExecutor(unittest.TestCase):
             kubernetes_executor.start()
             # Execute a task while the Api Throws errors
             try_number = 1
+            task_instance_key = ('dag', 'task', 'run_id', try_number)
             kubernetes_executor.execute_async(
-                key=('dag', 'task', 'run_id', try_number),
+                key=task_instance_key,
                 queue=None,
                 command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
             )
             kubernetes_executor.sync()
-            kubernetes_executor.sync()
 
-            assert mock_kube_client.create_namespaced_pod.called
-            assert not kubernetes_executor.task_queue.empty()
+            assert mock_kube_client.create_namespaced_pod.call_count == 1
 
-            # Disable the ApiException
-            mock_kube_client.create_namespaced_pod.side_effect = None
+            if should_requeue:
+                assert not kubernetes_executor.task_queue.empty()
 
-            # Execute the task without errors should empty the queue
-            kubernetes_executor.sync()
-            assert mock_kube_client.create_namespaced_pod.called
-            assert kubernetes_executor.task_queue.empty()
+                # Disable the ApiException
+                mock_kube_client.create_namespaced_pod.side_effect = None
+
+                # Execute the task without errors should empty the queue
+                mock_kube_client.create_namespaced_pod.reset_mock()
+                kubernetes_executor.sync()
+                assert mock_kube_client.create_namespaced_pod.called
+                assert kubernetes_executor.task_queue.empty()
+            else:
+                assert kubernetes_executor.task_queue.empty()
+                assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
 
     @mock.patch('airflow.executors.kubernetes_executor.KubeConfig')
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.sync')
@@ -279,7 +304,9 @@ class TestKubernetesExecutor(unittest.TestCase):
         assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed"
 
     @pytest.mark.execution_timeout(10)
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
     @mock.patch('airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.run_pod_async')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     def test_pod_template_file_override_in_executor_config(self, mock_get_kube_client, mock_run_pod_async):