You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/03/19 15:06:27 UTC

[airflow] 25/42: BugFix: TypeError in monitor_pod (#14513)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 87c26b464a7bba388c4830982fd11e7ae5384fa8
Author: Emil Ejbyfeldt <ee...@liveintent.com>
AuthorDate: Mon Mar 1 14:58:01 2021 +0100

    BugFix: TypeError in monitor_pod (#14513)
    
    If the log read is interrupted before any logs are produced then
    `last_log_time` will not be set and the line
    `delta = pendulum.now() - last_log_time` will fail with
    ```
    TypeError: unsupported operand type(s) for -: 'DateTime' and 'NoneType'
    ```
    
    This commit fix this issue by only updating `read_logs_since_sec` if
    `last_log_time` has been set.
    
    (cherry picked from commit 45a0ac2e01c174754f4e6612c8e4d3125061d096)
---
 airflow/kubernetes/pod_launcher.py    |  7 ++++---
 tests/kubernetes/test_pod_launcher.py | 18 +++++++++++++++++-
 2 files changed, 21 insertions(+), 4 deletions(-)

diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py
index 02194d7..3d663d2 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -140,9 +140,10 @@ class PodLauncher(LoggingMixin):
                     break
 
                 self.log.warning('Pod %s log read interrupted', pod.metadata.name)
-                delta = pendulum.now() - last_log_time
-                # Prefer logs duplication rather than loss
-                read_logs_since_sec = math.ceil(delta.total_seconds())
+                if last_log_time:
+                    delta = pendulum.now() - last_log_time
+                    # Prefer logs duplication rather than loss
+                    read_logs_since_sec = math.ceil(delta.total_seconds())
         result = None
         if self.extract_xcom:
             while self.base_container_is_running(pod):
diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py
index 9e7cc82..6e40264 100644
--- a/tests/kubernetes/test_pod_launcher.py
+++ b/tests/kubernetes/test_pod_launcher.py
@@ -21,7 +21,7 @@ import pytest
 from requests.exceptions import BaseHTTPError
 
 from airflow.exceptions import AirflowException
-from airflow.kubernetes.pod_launcher import PodLauncher
+from airflow.kubernetes.pod_launcher import PodLauncher, PodStatus
 
 
 class TestPodLauncher(unittest.TestCase):
@@ -170,6 +170,22 @@ class TestPodLauncher(unittest.TestCase):
             ]
         )
 
+    def test_monitor_pod_empty_logs(self):
+        mock.sentinel.metadata = mock.MagicMock()
+        running_status = mock.MagicMock()
+        running_status.configure_mock(**{'name': 'base', 'state.running': True})
+        pod_info_running = mock.MagicMock(**{'status.container_statuses': [running_status]})
+        pod_info_succeeded = mock.MagicMock(**{'status.phase': PodStatus.SUCCEEDED})
+
+        def pod_state_gen():
+            yield pod_info_running
+            while True:
+                yield pod_info_succeeded
+
+        self.mock_kube_client.read_namespaced_pod.side_effect = pod_state_gen()
+        self.mock_kube_client.read_namespaced_pod_log.return_value = iter(())
+        self.pod_launcher.monitor_pod(mock.sentinel, get_logs=True)
+
     def test_read_pod_retries_fails(self):
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod.side_effect = [