You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/03/04 18:08:18 UTC
[airflow] branch main updated: Fixed hanged KubernetesPodOperator (#28336)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6d2face107 Fixed hanged KubernetesPodOperator (#28336)
6d2face107 is described below
commit 6d2face107f24b7e7dce4b98ae3def1178e1fc4c
Author: max <42...@users.noreply.github.com>
AuthorDate: Sat Mar 4 19:08:09 2023 +0100
Fixed hanged KubernetesPodOperator (#28336)
Co-authored-by: Maksim Moiseenkov <mo...@google.com>
---
.../cncf/kubernetes/operators/kubernetes_pod.py | 2 +
.../providers/cncf/kubernetes/utils/pod_manager.py | 140 ++++++++++++-
.../cncf/kubernetes/utils/test_pod_manager.py | 226 +++++++++++++++++++--
3 files changed, 343 insertions(+), 25 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index bf889580bc..711607bbe4 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -225,6 +225,7 @@ class KubernetesPodOperator(BaseOperator):
BASE_CONTAINER_NAME = "base"
POD_CHECKED_KEY = "already_checked"
+ POST_TERMINATION_TIMEOUT = 120
template_fields: Sequence[str] = (
"image",
@@ -531,6 +532,7 @@ class KubernetesPodOperator(BaseOperator):
pod=self.pod,
container_name=self.base_container_name,
follow=True,
+ post_termination_timeout=self.POST_TERMINATION_TIMEOUT,
)
else:
self.pod_manager.await_container_completion(
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 56ef95eef0..3345c63cc8 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -23,23 +23,26 @@ import time
import warnings
from contextlib import closing, suppress
from dataclasses import dataclass
-from datetime import datetime
-from typing import TYPE_CHECKING, Iterable, cast
+from datetime import datetime, timedelta
+from typing import TYPE_CHECKING, Generator, cast
import pendulum
import tenacity
from kubernetes import client, watch
+from kubernetes.client.models.v1_container_status import V1ContainerStatus
from kubernetes.client.models.v1_pod import V1Pod
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream as kubernetes_stream
from pendulum import DateTime
from pendulum.parsing.exceptions import ParserError
from urllib3.exceptions import HTTPError as BaseHTTPError
+from urllib3.response import HTTPResponse
from airflow.exceptions import AirflowException
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.pod_generator import PodDefaults
from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.timezone import utcnow
if TYPE_CHECKING:
from kubernetes.client.models.core_v1_event_list import CoreV1EventList
@@ -70,15 +73,24 @@ class PodPhase:
terminal_states = {FAILED, SUCCEEDED}
+def get_container_status(pod: V1Pod, container_name: str) -> V1ContainerStatus | None:
+ """Retrieves container status"""
+ container_statuses = pod.status.container_statuses if pod and pod.status else None
+ if container_statuses:
+ # In general the variable container_statuses can store multiple items matching different containers.
+ # The following generator expression yields all items that have name equal to the container_name.
+ # The function next() here calls the generator to get only the first value. If there's nothing found
+ # then None is returned.
+ return next((x for x in container_statuses if x.name == container_name), None)
+ return None
+
+
def container_is_running(pod: V1Pod, container_name: str) -> bool:
"""
Examines V1Pod ``pod`` to determine whether ``container_name`` is running.
If that container is present and running, returns True. Returns False otherwise.
"""
- container_statuses = pod.status.container_statuses if pod and pod.status else None
- if not container_statuses:
- return False
- container_status = next((x for x in container_statuses if x.name == container_name), None)
+ container_status = get_container_status(pod, container_name)
if not container_status:
return False
return container_status.state.running is not None
@@ -91,6 +103,92 @@ def get_container_termination_message(pod: V1Pod, container_name: str):
return container_status.state.terminated.message if container_status else None
+class PodLogsConsumer:
+ """
+ PodLogsConsumer is responsible for pulling pod logs from a stream with checking a container status before
+ reading data.
+ This class is a workaround for the issue https://github.com/apache/airflow/issues/23497
+
+ :param response: HTTP response with logs
+ :param pod: Pod instance from Kubernetes client
+ :param pod_manager: Pod manager instance
+ :param container_name: Name of the container that we're reading logs from
+ :param post_termination_timeout: (Optional) The period of time in seconds representing for how long time
+ logs are available after the container termination.
+ :param read_pod_cache_timeout: (Optional) The container's status cache lifetime.
+ The container status is cached to reduce API calls.
+
+ :meta private:
+ """
+
+ def __init__(
+ self,
+ response: HTTPResponse,
+ pod: V1Pod,
+ pod_manager: PodManager,
+ container_name: str,
+ post_termination_timeout: int = 120,
+ read_pod_cache_timeout: int = 120,
+ ):
+ self.response = response
+ self.pod = pod
+ self.pod_manager = pod_manager
+ self.container_name = container_name
+ self.post_termination_timeout = post_termination_timeout
+ self.last_read_pod_at = None
+ self.read_pod_cache = None
+ self.read_pod_cache_timeout = read_pod_cache_timeout
+
+ def __iter__(self) -> Generator[bytes, None, None]:
+ r"""The generator yields log items divided by the '\n' symbol."""
+ incomplete_log_item: list[bytes] = []
+ if self.logs_available():
+ for data_chunk in self.response.stream(amt=None, decode_content=True):
+ if b"\n" in data_chunk:
+ log_items = data_chunk.split(b"\n")
+ yield from self._extract_log_items(incomplete_log_item, log_items)
+ incomplete_log_item = self._save_incomplete_log_item(log_items[-1])
+ else:
+ incomplete_log_item.append(data_chunk)
+ if not self.logs_available():
+ break
+ if incomplete_log_item:
+ yield b"".join(incomplete_log_item)
+
+ @staticmethod
+ def _extract_log_items(incomplete_log_item: list[bytes], log_items: list[bytes]):
+ yield b"".join(incomplete_log_item) + log_items[0] + b"\n"
+ for x in log_items[1:-1]:
+ yield x + b"\n"
+
+ @staticmethod
+ def _save_incomplete_log_item(sub_chunk: bytes):
+ return [sub_chunk] if [sub_chunk] else []
+
+ def logs_available(self):
+ remote_pod = self.read_pod()
+ if container_is_running(pod=remote_pod, container_name=self.container_name):
+ return True
+ container_status = get_container_status(pod=remote_pod, container_name=self.container_name)
+ state = container_status.state if container_status else None
+ terminated = state.terminated if state else None
+ if terminated:
+ termination_time = terminated.finished_at
+ if termination_time:
+ return termination_time + timedelta(seconds=self.post_termination_timeout) > utcnow()
+ return False
+
+ def read_pod(self):
+ _now = utcnow()
+ if (
+ self.read_pod_cache is None
+ or self.last_read_pod_at + timedelta(seconds=self.read_pod_cache_timeout) < _now
+ ):
+ self.read_pod_cache = self.pod_manager.read_pod(self.pod)
+ self.last_read_pod_at = _now
+ return self.read_pod_cache
+
+
@dataclass
class PodLoggingStatus:
"""Used for returning the status of the pod and last log time when exiting from `fetch_container_logs`"""
@@ -203,14 +301,22 @@ class PodManager(LoggingMixin):
return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True)
def fetch_container_logs(
- self, pod: V1Pod, container_name: str, *, follow=False, since_time: DateTime | None = None
+ self,
+ pod: V1Pod,
+ container_name: str,
+ *,
+ follow=False,
+ since_time: DateTime | None = None,
+ post_termination_timeout: int = 120,
) -> PodLoggingStatus:
"""
Follows the logs of container and streams to airflow logging.
Returns when container exits.
"""
- def consume_logs(*, since_time: DateTime | None = None, follow: bool = True) -> DateTime | None:
+ def consume_logs(
+ *, since_time: DateTime | None = None, follow: bool = True, termination_timeout: int = 120
+ ) -> DateTime | None:
"""
Tries to follow container logs until container completes.
For a long-running container, sometimes the log read may be interrupted
@@ -228,6 +334,7 @@ class PodManager(LoggingMixin):
math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None
),
follow=follow,
+ post_termination_timeout=termination_timeout,
)
for raw_line in logs:
line = raw_line.decode("utf-8", errors="backslashreplace")
@@ -251,7 +358,9 @@ class PodManager(LoggingMixin):
# So the looping logic is there to let us resume following the logs.
last_log_time = since_time
while True:
- last_log_time = consume_logs(since_time=last_log_time, follow=follow)
+ last_log_time = consume_logs(
+ since_time=last_log_time, follow=follow, termination_timeout=post_termination_timeout
+ )
if not self.container_is_running(pod, container_name=container_name):
return PodLoggingStatus(running=False, last_log_time=last_log_time)
if not follow:
@@ -327,7 +436,8 @@ class PodManager(LoggingMixin):
timestamps: bool = False,
since_seconds: int | None = None,
follow=True,
- ) -> Iterable[bytes]:
+ post_termination_timeout: int = 120,
+ ) -> PodLogsConsumer:
"""Reads log from the POD"""
additional_kwargs = {}
if since_seconds:
@@ -337,7 +447,7 @@ class PodManager(LoggingMixin):
additional_kwargs["tail_lines"] = tail_lines
try:
- return self._client.read_namespaced_pod_log(
+ logs = self._client.read_namespaced_pod_log(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
container=container_name,
@@ -350,6 +460,14 @@ class PodManager(LoggingMixin):
self.log.exception("There was an error reading the kubernetes API.")
raise
+ return PodLogsConsumer(
+ response=logs,
+ pod=pod,
+ pod_manager=self,
+ container_name=container_name,
+ post_termination_timeout=post_termination_timeout,
+ )
+
@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def read_pod_events(self, pod: V1Pod) -> CoreV1EventList:
"""Reads events from the POD"""
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index a089e86119..0e3e84f1e5 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -17,18 +17,26 @@
from __future__ import annotations
import logging
+from datetime import datetime
from unittest import mock
from unittest.mock import MagicMock
import pendulum
import pytest
+import time_machine
from kubernetes.client.rest import ApiException
from pendulum import DateTime
from pendulum.tz.timezone import Timezone
from urllib3.exceptions import HTTPError as BaseHTTPError
from airflow.exceptions import AirflowException
-from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager, PodPhase, container_is_running
+from airflow.providers.cncf.kubernetes.utils.pod_manager import (
+ PodLogsConsumer,
+ PodManager,
+ PodPhase,
+ container_is_running,
+)
+from airflow.utils.timezone import utc
from tests.test_utils.providers import get_provider_version, object_exists
@@ -41,7 +49,8 @@ class TestPodManager:
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod_log.return_value = mock.sentinel.logs
logs = self.pod_manager.read_pod_logs(pod=mock.sentinel, container_name="base")
- assert mock.sentinel.logs == logs
+ assert type(logs) == PodLogsConsumer
+ assert logs.response == mock.sentinel.logs
def test_read_pod_logs_retries_successfully(self):
mock.sentinel.metadata = mock.MagicMock()
@@ -50,7 +59,8 @@ class TestPodManager:
mock.sentinel.logs,
]
logs = self.pod_manager.read_pod_logs(pod=mock.sentinel, container_name="base")
- assert mock.sentinel.logs == logs
+ assert type(logs) == PodLogsConsumer
+ assert mock.sentinel.logs == logs.response
self.mock_kube_client.read_namespaced_pod_log.assert_has_calls(
[
mock.call(
@@ -86,7 +96,8 @@ class TestPodManager:
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod_log.side_effect = [mock.sentinel.logs]
logs = self.pod_manager.read_pod_logs(pod=mock.sentinel, container_name="base", tail_lines=100)
- assert mock.sentinel.logs == logs
+ assert type(logs) == PodLogsConsumer
+ assert mock.sentinel.logs == logs.response
self.mock_kube_client.read_namespaced_pod_log.assert_has_calls(
[
mock.call(
@@ -105,7 +116,8 @@ class TestPodManager:
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod_log.side_effect = [mock.sentinel.logs]
logs = self.pod_manager.read_pod_logs(mock.sentinel, "base", since_seconds=2)
- assert mock.sentinel.logs == logs
+ assert type(logs) == PodLogsConsumer
+ assert mock.sentinel.logs == logs.response
self.mock_kube_client.read_namespaced_pod_log.assert_has_calls(
[
mock.call(
@@ -191,7 +203,8 @@ class TestPodManager:
yield pod_info_succeeded
self.mock_kube_client.read_namespaced_pod.side_effect = pod_state_gen()
- self.mock_kube_client.read_namespaced_pod_log.return_value = iter(())
+ mock_response = mock.MagicMock(stream=mock.MagicMock(return_value=iter(())))
+ self.mock_kube_client.read_namespaced_pod_log.return_value = mock_response
self.pod_manager.fetch_container_logs(mock.sentinel, "base")
def test_monitor_pod_logs_failures_non_fatal(self):
@@ -293,29 +306,34 @@ class TestPodManager:
@pytest.mark.parametrize("follow", [True, False])
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
- def test_fetch_container_done(self, container_running, follow):
+ @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodLogsConsumer.logs_available")
+ def test_fetch_container_done(self, logs_available, container_running, follow):
"""If container done, should exit, no matter setting of follow."""
mock_pod = MagicMock()
+ logs_available.return_value = False
container_running.return_value = False
- self.mock_kube_client.read_namespaced_pod_log.return_value = [b"2021-01-01 hi"]
ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name="base", follow=follow)
- assert ret.last_log_time == DateTime(2021, 1, 1, tzinfo=Timezone("UTC"))
+ assert ret.last_log_time is None
assert ret.running is False
@mock.patch("pendulum.now")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
- def test_fetch_container_since_time(self, container_running, mock_now):
+ @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodLogsConsumer.logs_available")
+ def test_fetch_container_since_time(self, logs_available, container_running, mock_now):
"""If given since_time, should be used."""
mock_pod = MagicMock()
mock_now.return_value = DateTime(2020, 1, 1, 0, 0, 5, tzinfo=Timezone("UTC"))
+ logs_available.return_value = True
container_running.return_value = False
- self.mock_kube_client.read_namespaced_pod_log.return_value = [b"2021-01-01 hi"]
+ self.mock_kube_client.read_namespaced_pod_log.return_value = mock.MagicMock(
+ stream=mock.MagicMock(return_value=[b"2021-01-01 hi"])
+ )
since_time = DateTime(2020, 1, 1, tzinfo=Timezone("UTC"))
self.pod_manager.fetch_container_logs(pod=mock_pod, container_name="base", since_time=since_time)
args, kwargs = self.mock_kube_client.read_namespaced_pod_log.call_args_list[0]
assert kwargs["since_seconds"] == 5
- @pytest.mark.parametrize("follow, is_running_calls, exp_running", [(True, 3, False), (False, 1, True)])
+ @pytest.mark.parametrize("follow, is_running_calls, exp_running", [(True, 3, False), (False, 3, False)])
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
def test_fetch_container_running_follow(
self, container_running_mock, follow, is_running_calls, exp_running
@@ -325,8 +343,10 @@ class TestPodManager:
When called with follow=False, should return immediately even though still running.
"""
mock_pod = MagicMock()
- container_running_mock.side_effect = [True, True, False] # only will be called once
- self.mock_kube_client.read_namespaced_pod_log.return_value = [b"2021-01-01 hi"]
+ container_running_mock.side_effect = [True, True, False]
+ self.mock_kube_client.read_namespaced_pod_log.return_value = mock.MagicMock(
+ stream=mock.MagicMock(return_value=[b"2021-01-01 hi"])
+ )
ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name="base", follow=follow)
assert len(container_running_mock.call_args_list) == is_running_calls
assert ret.last_log_time == DateTime(2021, 1, 1, tzinfo=Timezone("UTC"))
@@ -403,3 +423,181 @@ def test_container_is_running(remote_pod, result):
an object `e` such that `e.status.container_statuses` is None, and so on. This test
verifies the expected behavior."""
assert container_is_running(remote_pod, "base") is result
+
+
+class TestPodLogsConsumer:
+ @pytest.mark.parametrize(
+ "chunks, expected_logs",
+ [
+ ([b"message"], [b"message"]),
+ ([b"message1\nmessage2"], [b"message1\n", b"message2"]),
+ ([b"message1\n", b"message2"], [b"message1\n", b"message2"]),
+ ([b"first_part", b"_second_part"], [b"first_part_second_part"]),
+ ([b""], [b""]),
+ ],
+ )
+ def test_chunks(self, chunks, expected_logs):
+ with mock.patch.object(PodLogsConsumer, "logs_available") as logs_available:
+ logs_available.return_value = True
+ consumer = PodLogsConsumer(
+ response=mock.MagicMock(stream=mock.MagicMock(return_value=chunks)),
+ pod=mock.MagicMock(),
+ pod_manager=mock.MagicMock(container_is_running=mock.MagicMock(return_value=True)),
+ container_name="base",
+ )
+ assert list(consumer) == expected_logs
+
+ def test_container_is_not_running(self):
+ with mock.patch.object(PodLogsConsumer, "logs_available") as logs_available:
+ logs_available.return_value = False
+ consumer = PodLogsConsumer(
+ response=mock.MagicMock(stream=mock.MagicMock(return_value=[b"message1", b"message2"])),
+ pod=mock.MagicMock(),
+ pod_manager=mock.MagicMock(container_is_running=mock.MagicMock(return_value=False)),
+ container_name="base",
+ )
+ assert list(consumer) == []
+
+ @pytest.mark.parametrize(
+ "container_run, termination_time, now_time, post_termination_timeout, expected_logs_available",
+ [
+ (
+ False,
+ datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+ datetime(2022, 1, 1, 0, 1, 0, 0, tzinfo=utc),
+ 120,
+ True,
+ ),
+ (
+ False,
+ datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+ datetime(2022, 1, 1, 0, 2, 0, 0, tzinfo=utc),
+ 120,
+ False,
+ ),
+ (
+ False,
+ datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+ datetime(2022, 1, 1, 0, 5, 0, 0, tzinfo=utc),
+ 120,
+ False,
+ ),
+ (
+ True,
+ datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+ datetime(2022, 1, 1, 0, 1, 0, 0, tzinfo=utc),
+ 120,
+ True,
+ ),
+ (
+ True,
+ datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+ datetime(2022, 1, 1, 0, 2, 0, 0, tzinfo=utc),
+ 120,
+ True,
+ ),
+ (
+ True,
+ datetime(2022, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+ datetime(2022, 1, 1, 0, 5, 0, 0, tzinfo=utc),
+ 120,
+ True,
+ ),
+ ],
+ )
+ @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
+ @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_status")
+ def test_logs_available(
+ self,
+ mock_get_container_status,
+ mock_container_is_running,
+ container_run,
+ termination_time,
+ now_time,
+ post_termination_timeout,
+ expected_logs_available,
+ ):
+ mock_container_is_running.return_value = container_run
+ mock_get_container_status.return_value = mock.MagicMock(
+ state=mock.MagicMock(terminated=mock.MagicMock(finished_at=termination_time))
+ )
+ with time_machine.travel(now_time):
+ consumer = PodLogsConsumer(
+ response=mock.MagicMock(),
+ pod=mock.MagicMock(),
+ pod_manager=mock.MagicMock(),
+ container_name="base",
+ post_termination_timeout=post_termination_timeout,
+ )
+ assert consumer.logs_available() == expected_logs_available
+
+ @pytest.mark.parametrize(
+ "read_pod_cache_timeout, mock_read_pod_at_0, mock_read_pod_at_1, mock_read_pods, expected_read_pods",
+ [
+ (
+ 120,
+ datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+ datetime(2023, 1, 1, 0, 1, 0, 0, tzinfo=utc),
+ ["Read pod #0", "Read pod #1"],
+ ["Read pod #0", "Read pod #0"],
+ ),
+ (
+ 120,
+ datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+ datetime(2023, 1, 1, 0, 2, 0, 0, tzinfo=utc),
+ ["Read pod #0", "Read pod #1"],
+ ["Read pod #0", "Read pod #0"],
+ ),
+ (
+ 120,
+ datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+ datetime(2023, 1, 1, 0, 3, 0, 0, tzinfo=utc),
+ ["Read pod #0", "Read pod #1"],
+ ["Read pod #0", "Read pod #1"],
+ ),
+ (
+ 2,
+ datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+ datetime(2023, 1, 1, 0, 0, 1, 0, tzinfo=utc),
+ ["Read pod #0", "Read pod #1"],
+ ["Read pod #0", "Read pod #0"],
+ ),
+ (
+ 2,
+ datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+ datetime(2023, 1, 1, 0, 0, 2, 0, tzinfo=utc),
+ ["Read pod #0", "Read pod #1"],
+ ["Read pod #0", "Read pod #0"],
+ ),
+ (
+ 2,
+ datetime(2023, 1, 1, 0, 0, 0, 0, tzinfo=utc),
+ datetime(2023, 1, 1, 0, 0, 3, 0, tzinfo=utc),
+ ["Read pod #0", "Read pod #1"],
+ ["Read pod #0", "Read pod #1"],
+ ),
+ ],
+ )
+ def test_read_pod(
+ self,
+ read_pod_cache_timeout,
+ mock_read_pod_at_0,
+ mock_read_pod_at_1,
+ mock_read_pods,
+ expected_read_pods,
+ ):
+ consumer = PodLogsConsumer(
+ response=mock.MagicMock(),
+ pod=mock.MagicMock(),
+ pod_manager=mock.MagicMock(),
+ container_name="base",
+ read_pod_cache_timeout=read_pod_cache_timeout,
+ )
+ consumer.pod_manager.read_pod.side_effect = mock_read_pods
+ # first read
+ with time_machine.travel(mock_read_pod_at_0):
+ assert consumer.read_pod() == expected_read_pods[0]
+
+ # second read
+ with time_machine.travel(mock_read_pod_at_1):
+ assert consumer.read_pod() == expected_read_pods[1]