You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/12 23:14:53 UTC

[GitHub] [airflow] jedcunningham commented on a change in pull request #15336: Fail task when containers inside a pod fails

jedcunningham commented on a change in pull request #15336:
URL: https://github.com/apache/airflow/pull/15336#discussion_r611981896



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -218,6 +239,34 @@ def process_status(
                 resource_version,
             )
 
+    def process_container_statuses(
+        self,
+        pod_id: str,
+        statuses: List[Any],
+        namespace: str,
+        annotations: Dict[str, str],
+        resource_version: str,
+    ):
+        """Monitor pod container statuses"""
+        for container_status in statuses:
+            terminated = container_status.state.terminated
+            waiting = container_status.state.waiting
+            if terminated:
+                self.log.debug(
+                    "A container in the pod %s has terminated, reason: %s, message: %s",
+                    pod_id,
+                    terminated.reason,
+                    terminated.message,
+                )
+                self.watcher_queue.put((pod_id, namespace, State.FAILED, annotations, resource_version))

Review comment:
       I don't think we should be adding more than once to `watcher_queue`, right? It might be better to leave the queue handling to `process_status` and just return a bool, less to cart around too then.  Maybe something like this:
   
   ```
   def _has_terminated_containers(self, status: V1PodStatus) -> bool:
   ```

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -187,25 +188,45 @@ def process_status(
         self,
         pod_id: str,
         namespace: str,
-        status: str,
+        status: Any,

Review comment:
       ```suggestion
           status: k8s.V1PodStatus,
   ```

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -507,3 +506,113 @@ def test_process_status_catchall(self):
 
         self._run()
         self.watcher.watcher_queue.put.assert_not_called()
+
+    def test_container_status_of_terminating_fails_pod(self):
+        self.pod.status.phase = "Pending"
+        self.pod.status.container_statuses = [
+            k8s.V1ContainerStatus(
+                container_id=None,
+                image="apache/airflow:2.0.1-python3.8",
+                image_id="",
+                name="base",
+                ready="false",
+                restart_count=0,
+                state=k8s.V1ContainerState(
+                    terminated=k8s.V1ContainerStateTerminated(
+                        reason="Terminating", exit_code=1

Review comment:
       Have you seen, or can you recreate a `phase=Pending` and `state.terminated` pod? I don't see how it is possible to have both.
   
   I've tried a few scenarios with both init containers and sidecars and every case has resulted in the watcher marking it as failed (though maybe not immediately, because `phase=Running`) - however the TI still gets marked as success.
   
   Said another way, I think there are bugs around here, but I don't think looking at stuff in `phase=Pending` will help?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org