You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/10/09 23:00:26 UTC

[airflow] branch master updated: KubernetesPodOperator should retry log tailing in case of interruption (#11325)

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new b7404b0  KubernetesPodOperator should retry log tailing in case of interruption (#11325)
b7404b0 is described below

commit b7404b079ab57b6493d8ddd319bccdb40ff3ddc5
Author: MichaƂ Misiewicz <mi...@gmail.com>
AuthorDate: Sat Oct 10 00:59:47 2020 +0200

    KubernetesPodOperator should retry log tailing in case of interruption (#11325)
    
    * KubernetesPodOperator can retry log tailing in case of interruption
    
    * fix failing test
    
    * change read_pod_logs method formatting
    
    * KubernetesPodOperator retry log tailing based on last read log timestamp
    
    * fix test_parse_log_line test  formatting
    
    * add docstring to parse_log_line method
    
    * fix kubernetes integration test
---
 airflow/kubernetes/pod_launcher.py               | 56 +++++++++++++++++++++---
 kubernetes_tests/test_kubernetes_pod_operator.py |  2 +-
 tests/kubernetes/test_pod_launcher.py            | 42 +++++++++++++++---
 3 files changed, 88 insertions(+), 12 deletions(-)

diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py
index 06f5b2e..8a38716 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -16,10 +16,12 @@
 # under the License.
 """Launches PODs"""
 import json
+import math
 import time
 from datetime import datetime as dt
 from typing import Optional, Tuple
 
+import pendulum
 import tenacity
 from kubernetes import client, watch
 from kubernetes.client.models.v1_pod import V1Pod
@@ -125,9 +127,23 @@ class PodLauncher(LoggingMixin):
         :return:  Tuple[State, Optional[str]]
         """
         if get_logs:
-            logs = self.read_pod_logs(pod)
-            for line in logs:
-                self.log.info(line)
+            read_logs_since_sec = None
+            last_log_time = None
+            while True:
+                logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
+                for line in logs:
+                    timestamp, message = self.parse_log_line(line.decode('utf-8'))
+                    last_log_time = pendulum.parse(timestamp)
+                    self.log.info(message)
+                time.sleep(1)
+
+                if not self.base_container_is_running(pod):
+                    break
+
+                self.log.warning('Pod %s log read interrupted', pod.metadata.name)
+                delta = pendulum.now() - last_log_time
+                # Prefer logs duplication rather than loss
+                read_logs_since_sec = math.ceil(delta.total_seconds())
         result = None
         if self.extract_xcom:
             while self.base_container_is_running(pod):
@@ -141,6 +157,22 @@ class PodLauncher(LoggingMixin):
             time.sleep(2)
         return self._task_status(self.read_pod(pod)), result
 
+    def parse_log_line(self, line: str) -> Tuple[str, str]:
+        """
+        Parse K8s log line and returns the final state
+
+        :param line: k8s log line
+        :type line: str
+        :return: timestamp and log message
+        :rtype: Tuple[str, str]
+        """
+        split_at = line.find(' ')
+        if split_at == -1:
+            raise Exception('Log not in "{{timestamp}} {{log}}" format. Got: {}'.format(line))
+        timestamp = line[:split_at]
+        message = line[split_at + 1:].rstrip()
+        return timestamp, message
+
     def _task_status(self, event):
         self.log.info(
             'Event: %s had an event of type %s',
@@ -172,16 +204,28 @@ class PodLauncher(LoggingMixin):
         wait=tenacity.wait_exponential(),
         reraise=True
     )
-    def read_pod_logs(self, pod: V1Pod, tail_lines: int = 10):
+    def read_pod_logs(self,
+                      pod: V1Pod,
+                      tail_lines: Optional[int] = None,
+                      timestamps: bool = False,
+                      since_seconds: Optional[int] = None):
         """Reads log from the POD"""
+        additional_kwargs = {}
+        if since_seconds:
+            additional_kwargs['since_seconds'] = since_seconds
+
+        if tail_lines:
+            additional_kwargs['tail_lines'] = tail_lines
+
         try:
             return self._client.read_namespaced_pod_log(
                 name=pod.metadata.name,
                 namespace=pod.metadata.namespace,
                 container='base',
                 follow=True,
-                tail_lines=tail_lines,
-                _preload_content=False
+                timestamps=timestamps,
+                _preload_content=False,
+                **additional_kwargs
             )
         except BaseHTTPError as e:
             raise AirflowException(
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index f7184e5..682eada 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -414,7 +414,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             )
             context = create_context(k)
             k.execute(context=context)
-            mock_logger.info.assert_any_call(b"retrieved from mount\n")
+            mock_logger.info.assert_any_call('retrieved from mount')
             actual_pod = self.api_client.sanitize_for_serialization(k.pod)
             self.expected_pod['spec']['containers'][0]['args'] = args
             self.expected_pod['spec']['containers'][0]['volumeMounts'] = [{
diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py
index be3812b..9c35f4b 100644
--- a/tests/kubernetes/test_pod_launcher.py
+++ b/tests/kubernetes/test_pod_launcher.py
@@ -48,17 +48,17 @@ class TestPodLauncher(unittest.TestCase):
                 _preload_content=False,
                 container='base',
                 follow=True,
+                timestamps=False,
                 name=mock.sentinel.metadata.name,
-                namespace=mock.sentinel.metadata.namespace,
-                tail_lines=10
+                namespace=mock.sentinel.metadata.namespace
             ),
             mock.call(
                 _preload_content=False,
                 container='base',
                 follow=True,
+                timestamps=False,
                 name=mock.sentinel.metadata.name,
-                namespace=mock.sentinel.metadata.namespace,
-                tail_lines=10
+                namespace=mock.sentinel.metadata.namespace
             )
         ])
 
@@ -80,19 +80,39 @@ class TestPodLauncher(unittest.TestCase):
         self.mock_kube_client.read_namespaced_pod_log.side_effect = [
             mock.sentinel.logs
         ]
-        logs = self.pod_launcher.read_pod_logs(mock.sentinel, 100)
+        logs = self.pod_launcher.read_pod_logs(mock.sentinel, tail_lines=100)
         self.assertEqual(mock.sentinel.logs, logs)
         self.mock_kube_client.read_namespaced_pod_log.assert_has_calls([
             mock.call(
                 _preload_content=False,
                 container='base',
                 follow=True,
+                timestamps=False,
                 name=mock.sentinel.metadata.name,
                 namespace=mock.sentinel.metadata.namespace,
                 tail_lines=100
             ),
         ])
 
+    def test_read_pod_logs_successfully_with_since_seconds(self):
+        mock.sentinel.metadata = mock.MagicMock()
+        self.mock_kube_client.read_namespaced_pod_log.side_effect = [
+            mock.sentinel.logs
+        ]
+        logs = self.pod_launcher.read_pod_logs(mock.sentinel, since_seconds=2)
+        self.assertEqual(mock.sentinel.logs, logs)
+        self.mock_kube_client.read_namespaced_pod_log.assert_has_calls([
+            mock.call(
+                _preload_content=False,
+                container='base',
+                follow=True,
+                timestamps=False,
+                name=mock.sentinel.metadata.name,
+                namespace=mock.sentinel.metadata.namespace,
+                since_seconds=2
+            ),
+        ])
+
     def test_read_pod_events_successfully_returns_events(self):
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.list_namespaced_event.return_value = mock.sentinel.events
@@ -162,3 +182,15 @@ class TestPodLauncher(unittest.TestCase):
             self.pod_launcher.read_pod,
             mock.sentinel
         )
+
+    def test_parse_log_line(self):
+        timestamp, message = \
+            self.pod_launcher.parse_log_line('2020-10-08T14:16:17.793417674Z Valid message\n')
+
+        self.assertEqual(timestamp, '2020-10-08T14:16:17.793417674Z')
+        self.assertEqual(message, 'Valid message')
+
+        self.assertRaises(
+            Exception,
+            self.pod_launcher.parse_log_line('2020-10-08T14:16:17.793417674ZInvalid message\n'),
+        )