You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/02 02:02:34 UTC

[GitHub] [airflow] dstandish opened a new pull request #19359: Task should fail immediately when pod is unprocessable

dstandish opened a new pull request #19359:
URL: https://github.com/apache/airflow/pull/19359


   When pod has invalid requirements, e.g. resource limit < resource request,
   the kubernetes api may return "Unprocessable Entity".  In this scenario,
   the kubernetes executor should fail the task immediately, rather than set
   it to be attempted again
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r742236709



##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -225,24 +251,30 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc
             kubernetes_executor.start()
             # Execute a task while the Api Throws errors
             try_number = 1
+            task_instance_key = ('dag', 'task', 'run_id', try_number)
             kubernetes_executor.execute_async(
-                key=('dag', 'task', 'run_id', try_number),
+                key=task_instance_key,
                 queue=None,
                 command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
             )
             kubernetes_executor.sync()
             kubernetes_executor.sync()
 
             assert mock_kube_client.create_namespaced_pod.called

Review comment:
       yeah it's odd... 
   
   the test actually calls sync 2 times prior to this line
   
   so depending on the scenario call count could be 2.
   
   but i will remove one of the sync calls and assert == 1...  it seems that must have been a mistake.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r742229711



##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       though maybe this approach is better 🤷:
   ![image](https://user-images.githubusercontent.com/15932138/140171963-4d869370-f9f5-4894-9663-68a82ec87db8.png)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham closed pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
jedcunningham closed pull request #19359:
URL: https://github.com/apache/airflow/pull/19359






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r742159599



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):
+                        self.log.error("Pod creation failed with reason %r. Failing task", e.reason)

Review comment:
       Ah, gotcha, no that makes sense. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r741325058



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       It looks like we could do that:
   
   ```
           - 403 Forbidden will be returned when your request exceeds namespace quota.
           - 422 Unprocessable Entity is returned when your parameters are valid but unsupported
               e.g. limits lower than requests.
           - 400 BadRequest is returned when your parameters are invalid e.g. asking for cpu=100ABC123.
   ```
   
   I guess one reason to use `reason` in the if statement though, is that it's english.  So we can read
   
   ```
   if e.reason in ("BadRequest", "Unprocessable Entity"):
   ```
   
   rather than
   
   ```
   if e.status in (400, 422):
   ```
   
   because the former is more descriptive / self-documenting.  
   
   But I also don't have a strong opinion on this.

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       Do you mean we should use the status code instead of the reason text?
   
   It looks like we could do that:
   
   ```
           - 403 Forbidden will be returned when your request exceeds namespace quota.
           - 422 Unprocessable Entity is returned when your parameters are valid but unsupported
               e.g. limits lower than requests.
           - 400 BadRequest is returned when your parameters are invalid e.g. asking for cpu=100ABC123.
   ```
   
   I guess one reason to use `reason` in the if statement though, is that it's english.  So we can read
   
   ```
   if e.reason in ("BadRequest", "Unprocessable Entity"):
   ```
   
   rather than
   
   ```
   if e.status in (400, 422):
   ```
   
   because the former is more descriptive / self-documenting.  
   
   But I also don't have a strong opinion on this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r740794503



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       I prefer we use `e.status` for this and give different error messages for BadRequest error and 422. No strong opinion though




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#issuecomment-959863261


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r742106170



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):

Review comment:
       Might be nice to bring the comment from the tests here as well to explain why we might see these 2 responses.

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -225,24 +251,30 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc
             kubernetes_executor.start()
             # Execute a task while the Api Throws errors
             try_number = 1
+            task_instance_key = ('dag', 'task', 'run_id', try_number)
             kubernetes_executor.execute_async(
-                key=('dag', 'task', 'run_id', try_number),
+                key=task_instance_key,
                 queue=None,
                 command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
             )
             kubernetes_executor.sync()
             kubernetes_executor.sync()
 
             assert mock_kube_client.create_namespaced_pod.called
-            assert not kubernetes_executor.task_queue.empty()
 
-            # Disable the ApiException
-            mock_kube_client.create_namespaced_pod.side_effect = None
+            if should_requeue:
+                assert not kubernetes_executor.task_queue.empty()
 
-            # Execute the task without errors should empty the queue
-            kubernetes_executor.sync()
-            assert mock_kube_client.create_namespaced_pod.called
-            assert kubernetes_executor.task_queue.empty()
+                # Disable the ApiException
+                mock_kube_client.create_namespaced_pod.side_effect = None
+
+                # Execute the task without errors should empty the queue
+                kubernetes_executor.sync()

Review comment:
       ```suggestion
                   mock_kube_client.create_namespaced_pod.reset_mock()
                   kubernetes_executor.sync()
   ```
   
   We need to reset the mock, otherwise we aren't checking it was actually called again.

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -225,24 +251,30 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc
             kubernetes_executor.start()
             # Execute a task while the Api Throws errors
             try_number = 1
+            task_instance_key = ('dag', 'task', 'run_id', try_number)
             kubernetes_executor.execute_async(
-                key=('dag', 'task', 'run_id', try_number),
+                key=task_instance_key,
                 queue=None,
                 command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
             )
             kubernetes_executor.sync()
             kubernetes_executor.sync()
 
             assert mock_kube_client.create_namespaced_pod.called

Review comment:
       ```suggestion
               mock_kube_client.create_namespaced_pod.assert_called_once()
   ```
   or
   ```suggestion
               assert mock_kube_client.create_namespaced_pod.call_count == 1
   ```

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,59 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, should_requeue',
+        [
+            ('Forbidden', True),
+            ('fake-unhandled-reason', True),
+            ('Unprocessable Entity', False),
+            ('BadRequest', False),

Review comment:
       I think I'd prefer to see the status code here instead, particularly since we aren't using the `reason` for anything. Then we can also drop the map from inside the test body too.
   
   If we do need both, I think the mapping should happen here anyways.

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):
+                        self.log.error("Pod creation failed with reason %r. Failing task", e.reason)

Review comment:
       ```suggestion
                           self.log.error("Failing Task. Pod creation failed with reason: %r, message: %r", e.reason, json.loads(e.body)['message'],)
   ```
   
   Does it makes sense to include the message too? (warning, untested)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #19359:
URL: https://github.com/apache/airflow/pull/19359


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r742149639



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):
+                        self.log.error("Pod creation failed with reason %r. Failing task", e.reason)

Review comment:
       in this case, the exception is already logged within `run_pod_async`, which catches, logs, and reraises.
   
   probably anything that is going to immediately reraise shouldn't bother logging (so that the logging is controlled by whatever is handling the exception).  but in this case, that's what the code does so it is probably better to only log once, because it can be a bit verbose. lmkyt




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r742106170



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):

Review comment:
       Might be nice to bring the comment from the tests here as well to explain why we might see these 2 responses.

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -225,24 +251,30 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc
             kubernetes_executor.start()
             # Execute a task while the Api Throws errors
             try_number = 1
+            task_instance_key = ('dag', 'task', 'run_id', try_number)
             kubernetes_executor.execute_async(
-                key=('dag', 'task', 'run_id', try_number),
+                key=task_instance_key,
                 queue=None,
                 command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
             )
             kubernetes_executor.sync()
             kubernetes_executor.sync()
 
             assert mock_kube_client.create_namespaced_pod.called
-            assert not kubernetes_executor.task_queue.empty()
 
-            # Disable the ApiException
-            mock_kube_client.create_namespaced_pod.side_effect = None
+            if should_requeue:
+                assert not kubernetes_executor.task_queue.empty()
 
-            # Execute the task without errors should empty the queue
-            kubernetes_executor.sync()
-            assert mock_kube_client.create_namespaced_pod.called
-            assert kubernetes_executor.task_queue.empty()
+                # Disable the ApiException
+                mock_kube_client.create_namespaced_pod.side_effect = None
+
+                # Execute the task without errors should empty the queue
+                kubernetes_executor.sync()

Review comment:
       ```suggestion
                   mock_kube_client.create_namespaced_pod.reset_mock()
                   kubernetes_executor.sync()
   ```
   
   We need to reset the mock, otherwise we aren't checking it was actually called again.

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -225,24 +251,30 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc
             kubernetes_executor.start()
             # Execute a task while the Api Throws errors
             try_number = 1
+            task_instance_key = ('dag', 'task', 'run_id', try_number)
             kubernetes_executor.execute_async(
-                key=('dag', 'task', 'run_id', try_number),
+                key=task_instance_key,
                 queue=None,
                 command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
             )
             kubernetes_executor.sync()
             kubernetes_executor.sync()
 
             assert mock_kube_client.create_namespaced_pod.called

Review comment:
       ```suggestion
               mock_kube_client.create_namespaced_pod.assert_called_once()
   ```
   or
   ```suggestion
               assert mock_kube_client.create_namespaced_pod.call_count == 1
   ```

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,59 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, should_requeue',
+        [
+            ('Forbidden', True),
+            ('fake-unhandled-reason', True),
+            ('Unprocessable Entity', False),
+            ('BadRequest', False),

Review comment:
       I think I'd prefer to see the status code here instead, particularly since we aren't using the `reason` for anything. Then we can also drop the map from inside the test body too.
   
   If we do need both, I think the mapping should happen here anyways.

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):
+                        self.log.error("Pod creation failed with reason %r. Failing task", e.reason)

Review comment:
       ```suggestion
                           self.log.error("Failing Task. Pod creation failed with reason: %r, message: %r", e.reason, json.loads(e.body)['message'],)
   ```
   
   Does it makes sense to include the message too? (warning, untested)

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):
+                        self.log.error("Pod creation failed with reason %r. Failing task", e.reason)

Review comment:
       Ah, gotcha, no that makes sense. Thanks.

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       `reason` isn't used, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r742222067



##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       `reason` isn't used, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r741568162



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       One reason (uh) against using `reason` is it's more brittle. `reason` is intended as human readable and could potentially change (e.g. `BadRequest` to `Bad Request`), while the status code is for machines and more suitable for Airflow's use case here IMO.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham closed pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
jedcunningham closed pull request #19359:
URL: https://github.com/apache/airflow/pull/19359


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#issuecomment-959863261


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r741568162



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       One reason (uh) against using `reason` is it's more brittle. `reason` is intended as human readable and could potentially change (e.g. `BadRequest` to `Bad Request`), while the status code is for machines and more suitable for Airflow's use case here IMO.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r741641790



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       yeah i can see that point.  another reason i went with `reason` was ... well... the code was already using `reason` for `BadRequest`.  And you know it occurred to me the possibility that the status codes have less fidelity than the reasons, i.e. possibly less precise, so that if we were to change this to status code, perhaps we would catch more scenarios in this block than was originally intended. e.g if 400 can be `BadRequest` and `LessBadRequest`, we might start catching something we weren't supposed to.
   
   but, i've seen no evidence of this, and since the majority seems to be in favor of status code, i'll make the change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r740794503



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       I prefer we use `e.status` for this and give different error messages for BadRequest error and 422. No strong opinion though




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r741511783



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       ```suggestion
                       if e.reason in ("BadRequest", "Unprocessable Entity"):
                           self.log.error("Pod creation failed with reason %r. Failing task", e.reason)
   ```
   ```suggestion
                       if e.reason in ("BadRequest", "Unprocessable Entity"):
                           self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r741645836



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       and i've updated it, ptal
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r742232036



##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       ok updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r742224124



##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       it provides a human-friendly test name.  this is a pattern i've seen in other airflow tests, though perhaps more commonly the param would be called `name`. wdyt?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r741641790



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       yeah i can see that point.  another reason i went with `reason` was ... well... the code was already using `reason` for `BadRequest`.  And you know it occurred to me the possibility that the status codes have less fidelity than the reasons, i.e. possibly less precise, so that if we were to change this to status code, perhaps we would catch more scenarios in this block than was originally intended. e.g if 400 can be `BadRequest` and `LessBadRequest`, we might start catching something we weren't supposed to.
   
   but, i've seen no evidence of this, and since the majority seems to be in favor of status code, i'll make the change.

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       and i've updated it, ptal
   

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,59 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, should_requeue',
+        [
+            ('Forbidden', True),
+            ('fake-unhandled-reason', True),
+            ('Unprocessable Entity', False),
+            ('BadRequest', False),

Review comment:
       yeah that's the right call.  

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):
+                        self.log.error("Pod creation failed with reason %r. Failing task", e.reason)

Review comment:
       in this case, the exception is already logged within `run_pod_async`, which catches, logs, and reraises.
   
   probably anything that is going to immediately reraise shouldn't bother logging (so that the logging is controlled by whatever is handling the exception).  but in this case, that's what the code does so it is probably better to only log once, because it can be a bit verbose. lmkyt

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       it provides a human-friendly test name.  this is a pattern i've seen in other airflow tests, though perhaps more commonly the param would be called `name`. wdyt?

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       e.g.
   
   ![image](https://user-images.githubusercontent.com/15932138/140171033-cd0b5293-cbb5-4232-aa59-58d88355c287.png)
   

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       though maybe this approach is better 🤷:
   ![image](https://user-images.githubusercontent.com/15932138/140171963-4d869370-f9f5-4894-9663-68a82ec87db8.png)
   

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       ok updated

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -225,24 +251,30 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc
             kubernetes_executor.start()
             # Execute a task while the Api Throws errors
             try_number = 1
+            task_instance_key = ('dag', 'task', 'run_id', try_number)
             kubernetes_executor.execute_async(
-                key=('dag', 'task', 'run_id', try_number),
+                key=task_instance_key,
                 queue=None,
                 command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
             )
             kubernetes_executor.sync()
             kubernetes_executor.sync()
 
             assert mock_kube_client.create_namespaced_pod.called

Review comment:
       yeah it's odd... 
   
   the test actually calls sync 2 times prior to this line
   
   so depending on the scenario call count could be 2.
   
   but i will remove one of the sync calls and assert == 1...  it seems that must have been a mistake.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r742106170



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):

Review comment:
       Might be nice to bring the comment from the tests here as well to explain why we might see these 2 responses.

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -225,24 +251,30 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc
             kubernetes_executor.start()
             # Execute a task while the Api Throws errors
             try_number = 1
+            task_instance_key = ('dag', 'task', 'run_id', try_number)
             kubernetes_executor.execute_async(
-                key=('dag', 'task', 'run_id', try_number),
+                key=task_instance_key,
                 queue=None,
                 command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
             )
             kubernetes_executor.sync()
             kubernetes_executor.sync()
 
             assert mock_kube_client.create_namespaced_pod.called
-            assert not kubernetes_executor.task_queue.empty()
 
-            # Disable the ApiException
-            mock_kube_client.create_namespaced_pod.side_effect = None
+            if should_requeue:
+                assert not kubernetes_executor.task_queue.empty()
 
-            # Execute the task without errors should empty the queue
-            kubernetes_executor.sync()
-            assert mock_kube_client.create_namespaced_pod.called
-            assert kubernetes_executor.task_queue.empty()
+                # Disable the ApiException
+                mock_kube_client.create_namespaced_pod.side_effect = None
+
+                # Execute the task without errors should empty the queue
+                kubernetes_executor.sync()

Review comment:
       ```suggestion
                   mock_kube_client.create_namespaced_pod.reset_mock()
                   kubernetes_executor.sync()
   ```
   
   We need to reset the mock, otherwise we aren't checking it was actually called again.

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -225,24 +251,30 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc
             kubernetes_executor.start()
             # Execute a task while the Api Throws errors
             try_number = 1
+            task_instance_key = ('dag', 'task', 'run_id', try_number)
             kubernetes_executor.execute_async(
-                key=('dag', 'task', 'run_id', try_number),
+                key=task_instance_key,
                 queue=None,
                 command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
             )
             kubernetes_executor.sync()
             kubernetes_executor.sync()
 
             assert mock_kube_client.create_namespaced_pod.called

Review comment:
       ```suggestion
               mock_kube_client.create_namespaced_pod.assert_called_once()
   ```
   or
   ```suggestion
               assert mock_kube_client.create_namespaced_pod.call_count == 1
   ```

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,59 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, should_requeue',
+        [
+            ('Forbidden', True),
+            ('fake-unhandled-reason', True),
+            ('Unprocessable Entity', False),
+            ('BadRequest', False),

Review comment:
       I think I'd prefer to see the status code here instead, particularly since we aren't using the `reason` for anything. Then we can also drop the map from inside the test body too.
   
   If we do need both, I think the mapping should happen here anyways.

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):
+                        self.log.error("Pod creation failed with reason %r. Failing task", e.reason)

Review comment:
       ```suggestion
                           self.log.error("Failing Task. Pod creation failed with reason: %r, message: %r", e.reason, json.loads(e.body)['message'],)
   ```
   
   Does it makes sense to include the message too? (warning, untested)

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):
+                        self.log.error("Pod creation failed with reason %r. Failing task", e.reason)

Review comment:
       Ah, gotcha, no that makes sense. Thanks.

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       `reason` isn't used, right?

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):

Review comment:
       Might be nice to bring the comment from the tests here as well to explain why we might see these 2 responses.

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -225,24 +251,30 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc
             kubernetes_executor.start()
             # Execute a task while the Api Throws errors
             try_number = 1
+            task_instance_key = ('dag', 'task', 'run_id', try_number)
             kubernetes_executor.execute_async(
-                key=('dag', 'task', 'run_id', try_number),
+                key=task_instance_key,
                 queue=None,
                 command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
             )
             kubernetes_executor.sync()
             kubernetes_executor.sync()
 
             assert mock_kube_client.create_namespaced_pod.called
-            assert not kubernetes_executor.task_queue.empty()
 
-            # Disable the ApiException
-            mock_kube_client.create_namespaced_pod.side_effect = None
+            if should_requeue:
+                assert not kubernetes_executor.task_queue.empty()
 
-            # Execute the task without errors should empty the queue
-            kubernetes_executor.sync()
-            assert mock_kube_client.create_namespaced_pod.called
-            assert kubernetes_executor.task_queue.empty()
+                # Disable the ApiException
+                mock_kube_client.create_namespaced_pod.side_effect = None
+
+                # Execute the task without errors should empty the queue
+                kubernetes_executor.sync()

Review comment:
       ```suggestion
                   mock_kube_client.create_namespaced_pod.reset_mock()
                   kubernetes_executor.sync()
   ```
   
   We need to reset the mock, otherwise we aren't checking it was actually called again.

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -225,24 +251,30 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc
             kubernetes_executor.start()
             # Execute a task while the Api Throws errors
             try_number = 1
+            task_instance_key = ('dag', 'task', 'run_id', try_number)
             kubernetes_executor.execute_async(
-                key=('dag', 'task', 'run_id', try_number),
+                key=task_instance_key,
                 queue=None,
                 command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
             )
             kubernetes_executor.sync()
             kubernetes_executor.sync()
 
             assert mock_kube_client.create_namespaced_pod.called

Review comment:
       ```suggestion
               mock_kube_client.create_namespaced_pod.assert_called_once()
   ```
   or
   ```suggestion
               assert mock_kube_client.create_namespaced_pod.call_count == 1
   ```

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,59 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, should_requeue',
+        [
+            ('Forbidden', True),
+            ('fake-unhandled-reason', True),
+            ('Unprocessable Entity', False),
+            ('BadRequest', False),

Review comment:
       I think I'd prefer to see the status code here instead, particularly since we aren't using the `reason` for anything. Then we can also drop the map from inside the test body too.
   
   If we do need both, I think the mapping should happen here anyways.

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):
+                        self.log.error("Pod creation failed with reason %r. Failing task", e.reason)

Review comment:
       ```suggestion
                           self.log.error("Failing Task. Pod creation failed with reason: %r, message: %r", e.reason, json.loads(e.body)['message'],)
   ```
   
   Does it makes sense to include the message too? (warning, untested)

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):
+                        self.log.error("Pod creation failed with reason %r. Failing task", e.reason)

Review comment:
       Ah, gotcha, no that makes sense. Thanks.

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       `reason` isn't used, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r740794503



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       I prefer we use `e.status` for this and give different error messages for BadRequest error and 422. No strong opinion though




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r742224964



##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       e.g.
   
   ![image](https://user-images.githubusercontent.com/15932138/140171033-cd0b5293-cbb5-4232-aa59-58d88355c287.png)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r741568162



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       One reason (uh) against using `reason` is it's more brittle. `reason` is intended as human readable and could potentially change (e.g. `BadRequest` to `Bad Request`), while the status code is for machines and more suitable for Airflow's use case here IMO.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r742141463



##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,59 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, should_requeue',
+        [
+            ('Forbidden', True),
+            ('fake-unhandled-reason', True),
+            ('Unprocessable Entity', False),
+            ('BadRequest', False),

Review comment:
       yeah that's the right call.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r741641790



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       yeah i can see that point.  another reason i went with `reason` was ... well... the code was already using `reason` for `BadRequest`.  And you know it occurred to me the possibility that the status codes have less fidelity than the reasons, i.e. possibly less precise, so that if we were to change this to status code, perhaps we would catch more scenarios in this block than was originally intended. e.g if 400 can be `BadRequest` and `LessBadRequest`, we might start catching something we weren't supposed to.
   
   but, i've seen no evidence of this, and since the majority seems to be in favor of status code, i'll make the change.

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       and i've updated it, ptal
   

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,59 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, should_requeue',
+        [
+            ('Forbidden', True),
+            ('fake-unhandled-reason', True),
+            ('Unprocessable Entity', False),
+            ('BadRequest', False),

Review comment:
       yeah that's the right call.  

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):
+                        self.log.error("Pod creation failed with reason %r. Failing task", e.reason)

Review comment:
       in this case, the exception is already logged within `run_pod_async`, which catches, logs, and reraises.
   
   probably anything that is going to immediately reraise shouldn't bother logging (so that the logging is controlled by whatever is handling the exception).  but in this case, that's what the code does so it is probably better to only log once, because it can be a bit verbose. lmkyt

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       it provides a human-friendly test name.  this is a pattern i've seen in other airflow tests, though perhaps more commonly the param would be called `name`. wdyt?

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       e.g.
   
   ![image](https://user-images.githubusercontent.com/15932138/140171033-cd0b5293-cbb5-4232-aa59-58d88355c287.png)
   

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       though maybe this approach is better 🤷:
   ![image](https://user-images.githubusercontent.com/15932138/140171963-4d869370-f9f5-4894-9663-68a82ec87db8.png)
   

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       ok updated

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -225,24 +251,30 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc
             kubernetes_executor.start()
             # Execute a task while the Api Throws errors
             try_number = 1
+            task_instance_key = ('dag', 'task', 'run_id', try_number)
             kubernetes_executor.execute_async(
-                key=('dag', 'task', 'run_id', try_number),
+                key=task_instance_key,
                 queue=None,
                 command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
             )
             kubernetes_executor.sync()
             kubernetes_executor.sync()
 
             assert mock_kube_client.create_namespaced_pod.called

Review comment:
       yeah it's odd... 
   
   the test actually calls sync 2 times prior to this line
   
   so depending on the scenario call count could be 2.
   
   but i will remove one of the sync calls and assert == 1...  it seems that must have been a mistake.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham closed pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
jedcunningham closed pull request #19359:
URL: https://github.com/apache/airflow/pull/19359


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r741641790



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       yeah i can see that point.  another reason i went with `reason` was ... well... the code was already using `reason` for `BadRequest`.  And you know it occurred to me the possibility that the status codes have less fidelity than the reasons, i.e. possibly less precise, so that if we were to change this to status code, perhaps we would catch more scenarios in this block than was originally intended. e.g if 400 can be `BadRequest` and `LessBadRequest`, we might start catching something we weren't supposed to.
   
   but, i've seen no evidence of this, and since the majority seems to be in favor of status code, i'll make the change.

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       and i've updated it, ptal
   

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,59 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, should_requeue',
+        [
+            ('Forbidden', True),
+            ('fake-unhandled-reason', True),
+            ('Unprocessable Entity', False),
+            ('BadRequest', False),

Review comment:
       yeah that's the right call.  

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):
+                        self.log.error("Pod creation failed with reason %r. Failing task", e.reason)

Review comment:
       in this case, the exception is already logged within `run_pod_async`, which catches, logs, and reraises.
   
   probably anything that is going to immediately reraise shouldn't bother logging (so that the logging is controlled by whatever is handling the exception).  but in this case, that's what the code does so it is probably better to only log once, because it can be a bit verbose. lmkyt

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       it provides a human-friendly test name.  this is a pattern i've seen in other airflow tests, though perhaps more commonly the param would be called `name`. wdyt?

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       e.g.
   
   ![image](https://user-images.githubusercontent.com/15932138/140171033-cd0b5293-cbb5-4232-aa59-58d88355c287.png)
   

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       though maybe this approach is better 🤷:
   ![image](https://user-images.githubusercontent.com/15932138/140171963-4d869370-f9f5-4894-9663-68a82ec87db8.png)
   

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, status, should_requeue

Review comment:
       ok updated

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -225,24 +251,30 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc
             kubernetes_executor.start()
             # Execute a task while the Api Throws errors
             try_number = 1
+            task_instance_key = ('dag', 'task', 'run_id', try_number)
             kubernetes_executor.execute_async(
-                key=('dag', 'task', 'run_id', try_number),
+                key=task_instance_key,
                 queue=None,
                 command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
             )
             kubernetes_executor.sync()
             kubernetes_executor.sync()
 
             assert mock_kube_client.create_namespaced_pod.called

Review comment:
       yeah it's odd... 
   
   the test actually calls sync 2 times prior to this line
   
   so depending on the scenario call count could be 2.
   
   but i will remove one of the sync calls and assert == 1...  it seems that must have been a mistake.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r741325058



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       It looks like we could do that:
   
   ```
           - 403 Forbidden will be returned when your request exceeds namespace quota.
           - 422 Unprocessable Entity is returned when your parameters are valid but unsupported
               e.g. limits lower than requests.
           - 400 BadRequest is returned when your parameters are invalid e.g. asking for cpu=100ABC123.
   ```
   
   I guess one reason to use `reason` in the if statement though, is that it's english.  So we can read
   
   ```
   if e.reason in ("BadRequest", "Unprocessable Entity"):
   ```
   
   rather than
   
   ```
   if e.status in (400, 422):
   ```
   
   because the former is more descriptive / self-documenting.  
   
   But I also don't have a strong opinion on this.

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       Do you mean we should use the status code instead of the reason text?
   
   It looks like we could do that:
   
   ```
           - 403 Forbidden will be returned when your request exceeds namespace quota.
           - 422 Unprocessable Entity is returned when your parameters are valid but unsupported
               e.g. limits lower than requests.
           - 400 BadRequest is returned when your parameters are invalid e.g. asking for cpu=100ABC123.
   ```
   
   I guess one reason to use `reason` in the if statement though, is that it's english.  So we can read
   
   ```
   if e.reason in ("BadRequest", "Unprocessable Entity"):
   ```
   
   rather than
   
   ```
   if e.status in (400, 422):
   ```
   
   because the former is more descriptive / self-documenting.  
   
   But I also don't have a strong opinion on this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r741511783



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.reason in ("BadRequest", "Unprocessable Entity"):
+                        self.log.error(f"Pod creation failed with reason {e.reason!r}. Failing task")

Review comment:
       ```suggestion
                       if e.reason in ("BadRequest", "Unprocessable Entity"):
                           self.log.error("Pod creation failed with reason %r. Failing task", e.reason)
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#issuecomment-959863261






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #19359: Task should fail immediately when pod is unprocessable

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #19359:
URL: https://github.com/apache/airflow/pull/19359






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org