You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2023/01/12 01:07:56 UTC

[GitHub] [airflow] jedcunningham opened a new pull request, #28871: KubenetesExecutor sends state even when successful

jedcunningham opened a new pull request, #28871:
URL: https://github.com/apache/airflow/pull/28871

   Like the other executors, KubernetesExecutor should send the "worker
   state" back to result buffer. This is more consistent, particularly
   around logging, with the other executors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28871: KubenetesExecutor sends state even when successful

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28871:
URL: https://github.com/apache/airflow/pull/28871#discussion_r1068629030


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -763,13 +763,14 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, na
             self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id, namespace=namespace)
             self.log.info("Patched pod %s in namespace %s to mark it as done", str(key), str(namespace))
 
-        if key in self.running:
+        try:
             self.running.remove(key)
-            # We do get multiple events once the pod hits a terminal state, and we only want to
+        except KeyError:
+            self.log.debug("TI key not in running, not adding to event_buffer: %s", key)
+        else:
+            # We get multiple events once the pod hits a terminal state, and we only want to

Review Comment:
   i think this changes the behavior
   
   if no error (key is in running) else will be entered
   
   previously, if key is in running, we do not hit else. is that desired?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28871: KubenetesExecutor sends state even when successful

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28871:
URL: https://github.com/apache/airflow/pull/28871#discussion_r1068636272


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -763,13 +763,14 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, na
             self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id, namespace=namespace)
             self.log.info("Patched pod %s in namespace %s to mark it as done", str(key), str(namespace))
 
-        if key in self.running:
+        try:
             self.running.remove(key)
-            # We do get multiple events once the pod hits a terminal state, and we only want to
+        except KeyError:
+            self.log.debug("TI key not in running, not adding to event_buffer: %s", key)
+        else:
+            # We get multiple events once the pod hits a terminal state, and we only want to

Review Comment:
   OH- -- that only was in the context of the single commit.... i see that was more a reversion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #28871: KubenetesExecutor sends state even when successful

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #28871:
URL: https://github.com/apache/airflow/pull/28871#discussion_r1067709600


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -751,19 +751,25 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, na
         if TYPE_CHECKING:
             assert self.kube_scheduler
 
-        if state != State.RUNNING:
-            if self.kube_config.delete_worker_pods:
-                if state != State.FAILED or self.kube_config.delete_worker_pods_on_failure:
-                    self.kube_scheduler.delete_pod(pod_id, namespace)
-                    self.log.info("Deleted pod: %s in namespace %s", str(key), str(namespace))
-            else:
-                self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id, namespace=namespace)
-                self.log.info("Patched pod %s in namespace %s to mark it as done", str(key), str(namespace))
-            try:
-                self.running.remove(key)
-            except KeyError:
-                self.log.debug("Could not find key: %s", str(key))
-        self.event_buffer[key] = state, None
+        if state == State.RUNNING:
+            self.event_buffer[key] = state, None
+            return
+
+        if self.kube_config.delete_worker_pods:
+            if state != State.FAILED or self.kube_config.delete_worker_pods_on_failure:
+                self.kube_scheduler.delete_pod(pod_id, namespace)
+                self.log.info("Deleted pod: %s in namespace %s", str(key), str(namespace))
+        else:
+            self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id, namespace=namespace)
+            self.log.info("Patched pod %s in namespace %s to mark it as done", str(key), str(namespace))
+
+        if key in self.running:
+            self.running.remove(key)
+            # We do get multiple events once the pod hits a terminal state, and we only want to
+            # do this once, so only do it when we remove the task from running
+            self.event_buffer[key] = state, None
+        else:
+            self.log.debug("TI key not in running, not adding to event_buffer: %s", str(key))

Review Comment:
   ```suggestion
           try:
               self.running.remove(key)
           except KeyError:
               self.log.debug("TI key not in running, not adding to event_buffer: %s", key)
           else:
               # We get multiple events once the pod hits a terminal state, and we only want to
               # do this once, so only do it when we remove the task from running
               self.event_buffer[key] = state, None
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28871: KubenetesExecutor sends state even when successful

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28871:
URL: https://github.com/apache/airflow/pull/28871#discussion_r1068629030


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -763,13 +763,14 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, na
             self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id, namespace=namespace)
             self.log.info("Patched pod %s in namespace %s to mark it as done", str(key), str(namespace))
 
-        if key in self.running:
+        try:
             self.running.remove(key)
-            # We do get multiple events once the pod hits a terminal state, and we only want to
+        except KeyError:
+            self.log.debug("TI key not in running, not adding to event_buffer: %s", key)
+        else:
+            # We get multiple events once the pod hits a terminal state, and we only want to

Review Comment:
   i think this changes the behavior
   
   if no error (key is in running) else will be entered
   
   previously, if key is in running, we do not hit else. is that desired?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr merged pull request #28871: KubenetesExecutor sends state even when successful

Posted by GitBox <gi...@apache.org>.
uranusjr merged PR #28871:
URL: https://github.com/apache/airflow/pull/28871


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #28871: KubenetesExecutor sends state even when successful

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #28871:
URL: https://github.com/apache/airflow/pull/28871#discussion_r1067709600


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -751,19 +751,25 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, na
         if TYPE_CHECKING:
             assert self.kube_scheduler
 
-        if state != State.RUNNING:
-            if self.kube_config.delete_worker_pods:
-                if state != State.FAILED or self.kube_config.delete_worker_pods_on_failure:
-                    self.kube_scheduler.delete_pod(pod_id, namespace)
-                    self.log.info("Deleted pod: %s in namespace %s", str(key), str(namespace))
-            else:
-                self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id, namespace=namespace)
-                self.log.info("Patched pod %s in namespace %s to mark it as done", str(key), str(namespace))
-            try:
-                self.running.remove(key)
-            except KeyError:
-                self.log.debug("Could not find key: %s", str(key))
-        self.event_buffer[key] = state, None
+        if state == State.RUNNING:
+            self.event_buffer[key] = state, None
+            return
+
+        if self.kube_config.delete_worker_pods:
+            if state != State.FAILED or self.kube_config.delete_worker_pods_on_failure:
+                self.kube_scheduler.delete_pod(pod_id, namespace)
+                self.log.info("Deleted pod: %s in namespace %s", str(key), str(namespace))
+        else:
+            self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id, namespace=namespace)
+            self.log.info("Patched pod %s in namespace %s to mark it as done", str(key), str(namespace))
+
+        if key in self.running:
+            self.running.remove(key)
+            # We do get multiple events once the pod hits a terminal state, and we only want to
+            # do this once, so only do it when we remove the task from running
+            self.event_buffer[key] = state, None
+        else:
+            self.log.debug("TI key not in running, not adding to event_buffer: %s", str(key))

Review Comment:
   ```suggestion
           try:
               self.running.remove(key)
           except KeyError:
               self.log.debug("TI key not in running, not adding to event_buffer: %s", str(key))
           else:
               # We get multiple events once the pod hits a terminal state, and we only want to
               # do this once, so only do it when we remove the task from running
               self.event_buffer[key] = state, None
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org