You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/03/04 18:08:18 UTC

[airflow] branch main updated: Fixed hanged KubernetesPodOperator (#28336)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d2face107 Fixed hanged KubernetesPodOperator (#28336)
6d2face107 is described below

commit 6d2face107f24b7e7dce4b98ae3def1178e1fc4c
Author: max <42...@users.noreply.github.com>
AuthorDate: Sat Mar 4 19:08:09 2023 +0100

    Fixed hanged KubernetesPodOperator (#28336)
    
    Co-authored-by: Maksim Moiseenkov <mo...@google.com>
---
 .../cncf/kubernetes/operators/kubernetes_pod.py    |   2 +
 .../providers/cncf/kubernetes/utils/pod_manager.py | 140 ++++++++++++-
 .../cncf/kubernetes/utils/test_pod_manager.py      | 226 +++++++++++++++++++--
 3 files changed, 343 insertions(+), 25 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index bf889580bc..711607bbe4 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -225,6 +225,7 @@ class KubernetesPodOperator(BaseOperator):
     BASE_CONTAINER_NAME = "base"
 
     POD_CHECKED_KEY = "already_checked"
+    POST_TERMINATION_TIMEOUT = 120
 
     template_fields: Sequence[str] = (
         "image",
@@ -531,6 +532,7 @@ class KubernetesPodOperator(BaseOperator):
                     pod=self.pod,
                     container_name=self.base_container_name,
                     follow=True,
+                    post_termination_timeout=self.POST_TERMINATION_TIMEOUT,
                 )
             else:
                 self.pod_manager.await_container_completion(
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 56ef95eef0..3345c63cc8 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -23,23 +23,26 @@ import time
 import warnings
 from contextlib import closing, suppress
 from dataclasses import dataclass
-from datetime import datetime
-from typing import TYPE_CHECKING, Iterable, cast
+from datetime import datetime, timedelta
+from typing import TYPE_CHECKING, Generator, cast
 
 import pendulum
 import tenacity
 from kubernetes import client, watch
+from kubernetes.client.models.v1_container_status import V1ContainerStatus
 from kubernetes.client.models.v1_pod import V1Pod
 from kubernetes.client.rest import ApiException
 from kubernetes.stream import stream as kubernetes_stream
 from pendulum import DateTime
 from pendulum.parsing.exceptions import ParserError
 from urllib3.exceptions import HTTPError as BaseHTTPError
+from urllib3.response import HTTPResponse
 
 from airflow.exceptions import AirflowException
 from airflow.kubernetes.kube_client import get_kube_client
 from airflow.kubernetes.pod_generator import PodDefaults
 from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.timezone import utcnow
 
 if TYPE_CHECKING:
     from kubernetes.client.models.core_v1_event_list import CoreV1EventList
@@ -70,15 +73,24 @@ class PodPhase:
     terminal_states = {FAILED, SUCCEEDED}
 
 
+def get_container_status(pod: V1Pod, container_name: str) -> V1ContainerStatus | None:
+    """Retrieves container status"""
+    container_statuses = pod.status.container_statuses if pod and pod.status else None
+    if container_statuses:
+        # In general the variable container_statuses can store multiple items matching different containers.
+        # The following generator expression yields all items that have name equal to the container_name.
+        # The function next() here calls the generator to get only the first value. If there's nothing found
+        # then None is returned.
+        return next((x for x in container_statuses if x.name == container_name), None)
+    return None
+
+
 def container_is_running(pod: V1Pod, container_name: str) -> bool:
     """
     Examines V1Pod ``pod`` to determine whether ``container_name`` is running.
     If that container is present and running, returns True.  Returns False otherwise.
     """
-    container_statuses = pod.status.container_statuses if pod and pod.status else None
-    if not container_statuses:
-        return False
-    container_status = next((x for x in container_statuses if x.name == container_name), None)
+    container_status = get_container_status(pod, container_name)
     if not container_status:
         return False
     return container_status.state.running is not None
@@ -91,6 +103,92 @@ def get_container_termination_message(pod: V1Pod, container_name: str):
         return container_status.state.terminated.message if container_status else None
 
 
+class PodLogsConsumer:
+    """
+    PodLogsConsumer is responsible for pulling pod logs from a stream with checking a container status before
+    reading data.
+    This class is a workaround for the issue https://github.com/apache/airflow/issues/23497
+
+    :param response: HTTP response with logs
+    :param pod: Pod instance from Kubernetes client
+    :param pod_manager: Pod manager instance
+    :param container_name: Name of the container that we're reading logs from
+    :param post_termination_timeout: (Optional) The period of time in seconds representing for how long time
+        logs are available after the container termination.
+    :param read_pod_cache_timeout: (Optional) The container's status cache lifetime.
+        The container status is cached to reduce API calls.
+
+    :meta private:
+    """
+
+    def __init__(
+        self,
+        response: HTTPResponse,
+        pod: V1Pod,
+        pod_manager: PodManager,
+        container_name: str,
+        post_termination_timeout: int = 120,
+        read_pod_cache_timeout: int = 120,
+    ):
+        self.response = response
+        self.pod = pod
+        self.pod_manager = pod_manager
+        self.container_name = container_name
+        self.post_termination_timeout = post_termination_timeout
+        self.last_read_pod_at = None
+        self.read_pod_cache = None
+        self.read_pod_cache_timeout = read_pod_cache_timeout
+
+    def __iter__(self) -> Generator[bytes, None, None]:
+        r"""The generator yields log items divided by the '\n' symbol."""
+        incomplete_log_item: list[bytes] = []
+        if self.logs_available():
+            for data_chunk in self.response.stream(amt=None, decode_content=True):
+                if b"\n" in data_chunk:
+                    log_items = data_chunk.split(b"\n")
+                    yield from self._extract_log_items(incomplete_log_item, log_items)
+                    incomplete_log_item = self._save_incomplete_log_item(log_items[-1])
+                else:
+                    incomplete_log_item.append(data_chunk)
+                if not self.logs_available():
+                    break
+        if incomplete_log_item:
+            yield b"".join(incomplete_log_item)
+
+    @staticmethod
+    def _extract_log_items(incomplete_log_item: list[bytes], log_items: list[bytes]):
+        yield b"".join(incomplete_log_item) + log_items[0] + b"\n"
+        for x in log_items[1:-1]:
+            yield x + b"\n"
+
+    @staticmethod
+    def _save_incomplete_log_item(sub_chunk: bytes):
+        return [sub_chunk] if [sub_chunk] else []
+
+    def logs_available(self):
+        remote_pod = self.read_pod()
+        if container_is_running(pod=remote_pod, container_name=self.container_name):
+            return True
+        container_status = get_container_status(pod=remote_pod, container_name=self.container_name)
+        state = container_status.state if container_status else None
+        terminated = state.terminated if state else None
+        if terminated:
+            termination_time = terminated.finished_at
+            if termination_time:
+                return termination_time + timedelta(seconds=self.post_termination_timeout) > utcnow()
+        return False
+
+    def read_pod(self):
+        _now = utcnow()
+        if (
+            self.read_pod_cache is None
+            or self.last_read_pod_at + timedelta(seconds=self.read_pod_cache_timeout) < _now
+        ):
+            self.read_pod_cache = self.pod_manager.read_pod(self.pod)
+            self.last_read_pod_at = _now
+        return self.read_pod_cache
+
+
 @dataclass
 class PodLoggingStatus:
     """Used for returning the status of the pod and last log time when exiting from `fetch_container_logs`"""
@@ -203,14 +301,22 @@ class PodManager(LoggingMixin):
         return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True)
 
     def fetch_container_logs(
-        self, pod: V1Pod, container_name: str, *, follow=False, since_time: DateTime | None = None
+        self,
+        pod: V1Pod,
+        container_name: str,
+        *,
+        follow=False,
+        since_time: DateTime | None = None,
+        post_termination_timeout: int = 120,
     ) -> PodLoggingStatus:
         """
         Follows the logs of container and streams to airflow logging.
         Returns when container exits.
         """
 
-        def consume_logs(*, since_time: DateTime | None = None, follow: bool = True) -> DateTime | None:
+        def consume_logs(
+            *, since_time: DateTime | None = None, follow: bool = True, termination_timeout: int = 120
+        ) -> DateTime | None:
             """
             Tries to follow container logs until container completes.
             For a long-running container, sometimes the log read may be interrupted
@@ -228,6 +334,7 @@ class PodManager(LoggingMixin):
                         math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None
                     ),
                     follow=follow,
+                    post_termination_timeout=termination_timeout,
                 )
                 for raw_line in logs:
                     line = raw_line.decode("utf-8", errors="backslashreplace")
@@ -251,7 +358,9 @@ class PodManager(LoggingMixin):
         # So the looping logic is there to let us resume following the logs.
         last_log_time = since_time
         while True:
-            last_log_time = consume_logs(since_time=last_log_time, follow=follow)
+            last_log_time = consume_logs(
+                since_time=last_log_time, follow=follow, termination_timeout=post_termination_timeout
+            )
             if not self.container_is_running(pod, container_name=container_name):
                 return PodLoggingStatus(running=False, last_log_time=last_log_time)
             if not follow:
@@ -327,7 +436,8 @@ class PodManager(LoggingMixin):
         timestamps: bool = False,
         since_seconds: int | None = None,
         follow=True,
-    ) -> Iterable[bytes]:
+        post_termination_timeout: int = 120,
+    ) -> PodLogsConsumer:
         """Reads log from the POD"""
         additional_kwargs = {}
         if since_seconds:
@@ -337,7 +447,7 @@ class PodManager(LoggingMixin):
             additional_kwargs["tail_lines"] = tail_lines
 
         try:
-            return self._client.read_namespaced_pod_log(
+            logs = self._client.read_namespaced_pod_log(
                 name=pod.metadata.name,
                 namespace=pod.metadata.namespace,
                 container=container_name,
@@ -350,6 +460,14 @@ class PodManager(LoggingMixin):
             self.log.exception("There was an error reading the kubernetes API.")
             raise
 
+        return PodLogsConsumer(
+            response=logs,
+            pod=pod,
+            pod_manager=self,
+            container_name=container_name,
+            post_termination_timeout=post_termination_timeout,
+        )
+
     @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
     def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
         """Reads events from the POD"""
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index a089e86119..0e3e84f1e5 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -17,18 +17,26 @@
 from __future__ import annotations
 
 import logging
+from datetime import datetime
 from unittest import mock
 from unittest.mock import MagicMock
 
 import pendulum
 import pytest
+import time_machine
 from kubernetes.client.rest import ApiException
 from pendulum import DateTime
 from pendulum.tz.timezone import Timezone
 from urllib3.exceptions import HTTPError as BaseHTTPError
 
 from airflow.exceptions import AirflowException
-from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager, PodPhase, container_is_running
+from airflow.providers.cncf.kubernetes.utils.pod_manager import (
+    PodLogsConsumer,
+    PodManager,
+    PodPhase,
+    container_is_running,
+)
+from airflow.utils.timezone import utc
 from tests.test_utils.providers import get_provider_version, object_exists
 
 
@@ -41,7 +49,8 @@ class TestPodManager:
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod_log.return_value = mock.sentinel.logs
         logs = self.pod_manager.read_pod_logs(pod=mock.sentinel, container_name="base")
-        assert mock.sentinel.logs == logs
+        assert type(logs) == PodLogsConsumer
+        assert logs.response == mock.sentinel.logs
 
     def test_read_pod_logs_retries_successfully(self):
         mock.sentinel.metadata = mock.MagicMock()
@@ -50,7 +59,8 @@ class TestPodManager:
             mock.sentinel.logs,
         ]
         logs = self.pod_manager.read_pod_logs(pod=mock.sentinel, container_name="base")
-        assert mock.sentinel.logs == logs
+        assert type(logs) == PodLogsConsumer
+        assert mock.sentinel.logs == logs.response
         self.mock_kube_client.read_namespaced_pod_log.assert_has_calls(
             [
                 mock.call(
@@ -86,7 +96,8 @@ class TestPodManager:
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod_log.side_effect = [mock.sentinel.logs]
         logs = self.pod_manager.read_pod_logs(pod=mock.sentinel, container_name="base", tail_lines=100)
-        assert mock.sentinel.logs == logs
+        assert type(logs) == PodLogsConsumer
+        assert mock.sentinel.logs == logs.response
         self.mock_kube_client.read_namespaced_pod_log.assert_has_calls(
             [
                 mock.call(
@@ -105,7 +116,8 @@ class TestPodManager:
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod_log.side_effect = [mock.sentinel.logs]
         logs = self.pod_manager.read_pod_logs(mock.sentinel, "base", since_seconds=2)
-        assert mock.sentinel.logs == logs
+        assert type(logs) == PodLogsConsumer
+        assert mock.sentinel.logs == logs.response
         self.mock_kube_client.read_namespaced_pod_log.assert_has_calls(
             [
                 mock.call(
@@ -191,7 +203,8 @@ class TestPodManager:
                 yield pod_info_succeeded
 
         self.mock_kube_client.read_namespaced_pod.side_effect = pod_state_gen()
-        self.mock_kube_client.read_namespaced_pod_log.return_value = iter(())
+        mock_response = mock.MagicMock(stream=mock.MagicMock(return_value=iter(())))
+        self.mock_kube_client.read_namespaced_pod_log.return_value = mock_response
         self.pod_manager.fetch_container_logs(mock.sentinel, "base")
 
     def test_monitor_pod_logs_failures_non_fatal(self):
@@ -293,29 +306,34 @@ class TestPodManager:
 
     @pytest.mark.parametrize("follow", [True, False])
     @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
-    def test_fetch_container_done(self, container_running, follow):
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodLogsConsumer.logs_available")
+    def test_fetch_container_done(self, logs_available, container_running, follow):
         """If container done, should exit, no matter setting of follow."""
         mock_pod = MagicMock()
+        logs_available.return_value = False
         container_running.return_value = False
-        self.mock_kube_client.read_namespaced_pod_log.return_value = [b"2021-01-01 hi"]
         ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name="base", follow=follow)
-        assert ret.last_log_time == DateTime(2021, 1, 1, tzinfo=Timezone("UTC"))
+        assert ret.last_log_time is None
         assert ret.running is False
 
     @mock.patch("pendulum.now")
     @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
-    def test_fetch_container_since_time(self, container_running, mock_now):
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodLogsConsumer.logs_available")
+    def test_fetch_container_since_time(self, logs_available, container_running, mock_now):
         """If given since_time, should be used."""
         mock_pod = MagicMock()
         mock_now.return_value = DateTime(2020, 1, 1, 0, 0, 5, tzinfo=Timezone("UTC"))
+        logs_available.return_value = True
         container_running.return_value = False
-        self.mock_kube_client.read_namespaced_pod_log.return_value = [b"2021-01-01 hi"]
+        self.mock_kube_client.read_namespaced_pod_log.return_value = mock.MagicMock(
+            stream=mock.MagicMock(return_value=[b"2021-01-01 hi"])
+        )
         since_time = DateTime(2020, 1, 1, tzinfo=Timezone("UTC"))
         self.pod_manager.fetch_container_logs(pod=mock_pod, container_name="base", since_time=since_time)
         args, kwargs = self.mock_kube_client.read_namespaced_pod_log.call_args_list[0]
         assert kwargs["since_seconds"] == 5
 
-    @pytest.mark.parametrize("follow, is_running_calls, exp_running", [(True, 3, False), (False, 1, True)])
+    @pytest.mark.parametrize("follow, is_running_calls, exp_running", [(True, 3, False), (False, 3, False)])
     @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
     def test_fetch_container_running_follow(
         self, container_running_mock, follow, is_running_calls, exp_running
@@ -325,8 +343,10 @@ class TestPodManager:
         When called with follow=False, should return immediately even though still running.
         """
         mock_pod = MagicMock()
-        container_running_mock.side_effect = [True, True, False]  # only will be called once
-        self.mock_kube_client.read_namespaced_pod_log.return_value = [b"2021-01-01 hi"]
+        container_running_mock.side_effect = [True, True, False]
+        self.mock_kube_client.read_namespaced_pod_log.return_value = mock.MagicMock(
+            stream=mock.MagicMock(return_value=[b"2021-01-01 hi"])
+        )
         ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name="base", follow=follow)
         assert len(container_running_mock.call_args_list) == is_running_calls
         assert ret.last_log_time == DateTime(2021, 1, 1, tzinfo=Timezone("UTC"))
@@ -403,3 +423,181 @@ def test_container_is_running(remote_pod, result):
     an object `e` such that `e.status.container_statuses` is None, and so on.  This test
     verifies the expected behavior."""
     assert container_is_running(remote_pod, "base") is result
+
+
+class TestPodLogsConsumer:
+    @pytest.mark.parametrize(
+        "chunks, expected_logs",
+        [
+            ([b"message"], [b"message"]),
+            ([b"message1\nmessage2"], [b"message1\n", b"message2"]),
+            ([b"message1\n", b"message2"], [b"message1\n", b"message2"]),
+            ([b"first_part", b"_second_part"], [b"first_part_second_part"]),
+            ([b""], [b""]),
+        ],
+    )
+    def test_chunks(self, chunks, expected_logs):
+        with mock.patch.object(PodLogsConsumer, "logs_available") as logs_available:
+            logs_available.return_value = True
+            consumer = PodLogsConsumer(
+                response=mock.MagicMock(stream=mock.MagicMock(return_value=chunks)),
+                pod=mock.MagicMock(),
+                pod_manager=mock.MagicMock(container_is_running=mock.MagicMock(return_value=True)),
+                container_name="base",
+            )
+            assert list(consumer) == expected_logs
+
+    def test_container_is_not_running(self):
+        with mock.patch.object(PodLogsConsumer, "logs_available") as logs_available:
+            logs_available.return_value = False
+            consumer = PodLogsConsumer(
+                response=mock.MagicMock(stream=mock.MagicMock(return_value=[b"message1", b"message2"])),
+                pod=mock.MagicMock(),
+                pod_manager=mock.MagicMock(container_is_running=mock.MagicMock(return_value=False)),
+                container_name="base",
+            )
+            assert list(consumer) == []
+
+    @pytest.mark.parametrize(
+        "container_run, termination_time, now_time, post_termination_timeout, expected_logs_available",
+        [
+            (
+                False,
+                datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+                datetime(2022, 1, 1, 0, 1, 0, 0, tzinfo=utc),
+                120,
+                True,
+            ),
+            (
+                False,
+                datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+                datetime(2022, 1, 1, 0, 2, 0, 0, tzinfo=utc),
+                120,
+                False,
+            ),
+            (
+                False,
+                datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+                datetime(2022, 1, 1, 0, 5, 0, 0, tzinfo=utc),
+                120,
+                False,
+            ),
+            (
+                True,
+                datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+                datetime(2022, 1, 1, 0, 1, 0, 0, tzinfo=utc),
+                120,
+                True,
+            ),
+            (
+                True,
+                datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+                datetime(2022, 1, 1, 0, 2, 0, 0, tzinfo=utc),
+                120,
+                True,
+            ),
+            (
+                True,
+                datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+                datetime(2022, 1, 1, 0, 5, 0, 0, tzinfo=utc),
+                120,
+                True,
+            ),
+        ],
+    )
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_status")
+    def test_logs_available(
+        self,
+        mock_get_container_status,
+        mock_container_is_running,
+        container_run,
+        termination_time,
+        now_time,
+        post_termination_timeout,
+        expected_logs_available,
+    ):
+        mock_container_is_running.return_value = container_run
+        mock_get_container_status.return_value = mock.MagicMock(
+            state=mock.MagicMock(terminated=mock.MagicMock(finished_at=termination_time))
+        )
+        with time_machine.travel(now_time):
+            consumer = PodLogsConsumer(
+                response=mock.MagicMock(),
+                pod=mock.MagicMock(),
+                pod_manager=mock.MagicMock(),
+                container_name="base",
+                post_termination_timeout=post_termination_timeout,
+            )
+            assert consumer.logs_available() == expected_logs_available
+
+    @pytest.mark.parametrize(
+        "read_pod_cache_timeout, mock_read_pod_at_0, mock_read_pod_at_1, mock_read_pods, expected_read_pods",
+        [
+            (
+                120,
+                datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+                datetime(2023, 1, 1, 0, 1, 0, 0, tzinfo=utc),
+                ["Read pod #0", "Read pod #1"],
+                ["Read pod #0", "Read pod #0"],
+            ),
+            (
+                120,
+                datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+                datetime(2023, 1, 1, 0, 2, 0, 0, tzinfo=utc),
+                ["Read pod #0", "Read pod #1"],
+                ["Read pod #0", "Read pod #0"],
+            ),
+            (
+                120,
+                datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+                datetime(2023, 1, 1, 0, 3, 0, 0, tzinfo=utc),
+                ["Read pod #0", "Read pod #1"],
+                ["Read pod #0", "Read pod #1"],
+            ),
+            (
+                2,
+                datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+                datetime(2023, 1, 1, 0, 0, 1, 0, tzinfo=utc),
+                ["Read pod #0", "Read pod #1"],
+                ["Read pod #0", "Read pod #0"],
+            ),
+            (
+                2,
+                datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+                datetime(2023, 1, 1, 0, 0, 2, 0, tzinfo=utc),
+                ["Read pod #0", "Read pod #1"],
+                ["Read pod #0", "Read pod #0"],
+            ),
+            (
+                2,
+                datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+                datetime(2023, 1, 1, 0, 0, 3, 0, tzinfo=utc),
+                ["Read pod #0", "Read pod #1"],
+                ["Read pod #0", "Read pod #1"],
+            ),
+        ],
+    )
+    def test_read_pod(
+        self,
+        read_pod_cache_timeout,
+        mock_read_pod_at_0,
+        mock_read_pod_at_1,
+        mock_read_pods,
+        expected_read_pods,
+    ):
+        consumer = PodLogsConsumer(
+            response=mock.MagicMock(),
+            pod=mock.MagicMock(),
+            pod_manager=mock.MagicMock(),
+            container_name="base",
+            read_pod_cache_timeout=read_pod_cache_timeout,
+        )
+        consumer.pod_manager.read_pod.side_effect = mock_read_pods
+        # first read
+        with time_machine.travel(mock_read_pod_at_0):
+            assert consumer.read_pod() == expected_read_pods[0]
+
+        # second read
+        with time_machine.travel(mock_read_pod_at_1):
+            assert consumer.read_pod() == expected_read_pods[1]