You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/03/19 15:06:27 UTC
[airflow] 25/42: BugFix: TypeError in monitor_pod (#14513)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 87c26b464a7bba388c4830982fd11e7ae5384fa8
Author: Emil Ejbyfeldt <ee...@liveintent.com>
AuthorDate: Mon Mar 1 14:58:01 2021 +0100
BugFix: TypeError in monitor_pod (#14513)
If the log read is interrupted before any logs are produced then
`last_log_time` will not be set and the line
`delta = pendulum.now() - last_log_time` will fail with
```
TypeError: unsupported operand type(s) for -: 'DateTime' and 'NoneType'
```
This commit fix this issue by only updating `read_logs_since_sec` if
`last_log_time` has been set.
(cherry picked from commit 45a0ac2e01c174754f4e6612c8e4d3125061d096)
---
airflow/kubernetes/pod_launcher.py | 7 ++++---
tests/kubernetes/test_pod_launcher.py | 18 +++++++++++++++++-
2 files changed, 21 insertions(+), 4 deletions(-)
diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py
index 02194d7..3d663d2 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -140,9 +140,10 @@ class PodLauncher(LoggingMixin):
break
self.log.warning('Pod %s log read interrupted', pod.metadata.name)
- delta = pendulum.now() - last_log_time
- # Prefer logs duplication rather than loss
- read_logs_since_sec = math.ceil(delta.total_seconds())
+ if last_log_time:
+ delta = pendulum.now() - last_log_time
+ # Prefer logs duplication rather than loss
+ read_logs_since_sec = math.ceil(delta.total_seconds())
result = None
if self.extract_xcom:
while self.base_container_is_running(pod):
diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py
index 9e7cc82..6e40264 100644
--- a/tests/kubernetes/test_pod_launcher.py
+++ b/tests/kubernetes/test_pod_launcher.py
@@ -21,7 +21,7 @@ import pytest
from requests.exceptions import BaseHTTPError
from airflow.exceptions import AirflowException
-from airflow.kubernetes.pod_launcher import PodLauncher
+from airflow.kubernetes.pod_launcher import PodLauncher, PodStatus
class TestPodLauncher(unittest.TestCase):
@@ -170,6 +170,22 @@ class TestPodLauncher(unittest.TestCase):
]
)
+ def test_monitor_pod_empty_logs(self):
+ mock.sentinel.metadata = mock.MagicMock()
+ running_status = mock.MagicMock()
+ running_status.configure_mock(**{'name': 'base', 'state.running': True})
+ pod_info_running = mock.MagicMock(**{'status.container_statuses': [running_status]})
+ pod_info_succeeded = mock.MagicMock(**{'status.phase': PodStatus.SUCCEEDED})
+
+ def pod_state_gen():
+ yield pod_info_running
+ while True:
+ yield pod_info_succeeded
+
+ self.mock_kube_client.read_namespaced_pod.side_effect = pod_state_gen()
+ self.mock_kube_client.read_namespaced_pod_log.return_value = iter(())
+ self.pod_launcher.monitor_pod(mock.sentinel, get_logs=True)
+
def test_read_pod_retries_fails(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod.side_effect = [