You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "moiseenkov (via GitHub)" <gi...@apache.org> on 2023/02/13 17:30:33 UTC

[GitHub] [airflow] moiseenkov commented on a diff in pull request #28336: Fixed hanged KubernetesPodOperator

moiseenkov commented on code in PR #28336:
URL: https://github.com/apache/airflow/pull/28336#discussion_r1104811220


##########
airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -91,6 +99,77 @@ def get_container_termination_message(pod: V1Pod, container_name: str):
         return container_status.state.terminated.message if container_status else None
 
 
+class PodLogsConsumer:
+    """
+    PodLogsConsumer is responsible for pulling pod logs from a stream with checking a container status before
+    reading data.
+    This class is a workaround for the issue https://github.com/apache/airflow/issues/23497
+
+    :meta private:
+    """
+
+    def __init__(
+        self,
+        response: HTTPResponse,
+        pod: V1Pod,
+        pod_manager: PodManager,
+        container_name: str,
+        post_termination_timeout: int = 120,
+        read_pod_cache_timeout: int = 120,
+    ):
+        self.response = response
+        self.pod = pod
+        self.pod_manager = pod_manager
+        self.container_name = container_name
+        self.post_termination_timeout = post_termination_timeout
+        self.last_read_pod_at = None
+        self.read_pod_cache = None
+        self.read_pod_cache_timeout = read_pod_cache_timeout
+
+    def __iter__(self) -> Generator[bytes, None, None]:
+        messages: list[bytes] = []
+        if self.logs_available():
+            for chunk in self.response.stream(amt=None, decode_content=True):
+                if b"\n" in chunk:
+                    chunks = chunk.split(b"\n")
+                    yield b"".join(messages) + chunks[0] + b"\n"
+                    for x in chunks[1:-1]:
+                        yield x + b"\n"
+                    if chunks[-1]:
+                        messages = [chunks[-1]]
+                    else:
+                        messages = []
+                else:
+                    messages.append(chunk)
+                if not self.logs_available():
+                    break
+        if messages:
+            yield b"".join(messages)

Review Comment:
   Yes, thanks for noticing. I refactored this method to make it more readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org