You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/10/09 23:00:26 UTC
[airflow] branch master updated: KubernetesPodOperator should retry
log tailing in case of interruption (#11325)
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new b7404b0 KubernetesPodOperator should retry log tailing in case of interruption (#11325)
b7404b0 is described below
commit b7404b079ab57b6493d8ddd319bccdb40ff3ddc5
Author: MichaĆ Misiewicz <mi...@gmail.com>
AuthorDate: Sat Oct 10 00:59:47 2020 +0200
KubernetesPodOperator should retry log tailing in case of interruption (#11325)
* KubernetesPodOperator can retry log tailing in case of interruption
* fix failing test
* change read_pod_logs method formatting
* KubernetesPodOperator retry log tailing based on last read log timestamp
* fix test_parse_log_line test formatting
* add docstring to parse_log_line method
* fix kubernetes integration test
---
airflow/kubernetes/pod_launcher.py | 56 +++++++++++++++++++++---
kubernetes_tests/test_kubernetes_pod_operator.py | 2 +-
tests/kubernetes/test_pod_launcher.py | 42 +++++++++++++++---
3 files changed, 88 insertions(+), 12 deletions(-)
diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py
index 06f5b2e..8a38716 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -16,10 +16,12 @@
# under the License.
"""Launches PODs"""
import json
+import math
import time
from datetime import datetime as dt
from typing import Optional, Tuple
+import pendulum
import tenacity
from kubernetes import client, watch
from kubernetes.client.models.v1_pod import V1Pod
@@ -125,9 +127,23 @@ class PodLauncher(LoggingMixin):
:return: Tuple[State, Optional[str]]
"""
if get_logs:
- logs = self.read_pod_logs(pod)
- for line in logs:
- self.log.info(line)
+ read_logs_since_sec = None
+ last_log_time = None
+ while True:
+ logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
+ for line in logs:
+ timestamp, message = self.parse_log_line(line.decode('utf-8'))
+ last_log_time = pendulum.parse(timestamp)
+ self.log.info(message)
+ time.sleep(1)
+
+ if not self.base_container_is_running(pod):
+ break
+
+ self.log.warning('Pod %s log read interrupted', pod.metadata.name)
+ delta = pendulum.now() - last_log_time
+ # Prefer logs duplication rather than loss
+ read_logs_since_sec = math.ceil(delta.total_seconds())
result = None
if self.extract_xcom:
while self.base_container_is_running(pod):
@@ -141,6 +157,22 @@ class PodLauncher(LoggingMixin):
time.sleep(2)
return self._task_status(self.read_pod(pod)), result
+ def parse_log_line(self, line: str) -> Tuple[str, str]:
+ """
+ Parse K8s log line and returns the final state
+
+ :param line: k8s log line
+ :type line: str
+ :return: timestamp and log message
+ :rtype: Tuple[str, str]
+ """
+ split_at = line.find(' ')
+ if split_at == -1:
+ raise Exception('Log not in "{{timestamp}} {{log}}" format. Got: {}'.format(line))
+ timestamp = line[:split_at]
+ message = line[split_at + 1:].rstrip()
+ return timestamp, message
+
def _task_status(self, event):
self.log.info(
'Event: %s had an event of type %s',
@@ -172,16 +204,28 @@ class PodLauncher(LoggingMixin):
wait=tenacity.wait_exponential(),
reraise=True
)
- def read_pod_logs(self, pod: V1Pod, tail_lines: int = 10):
+ def read_pod_logs(self,
+ pod: V1Pod,
+ tail_lines: Optional[int] = None,
+ timestamps: bool = False,
+ since_seconds: Optional[int] = None):
"""Reads log from the POD"""
+ additional_kwargs = {}
+ if since_seconds:
+ additional_kwargs['since_seconds'] = since_seconds
+
+ if tail_lines:
+ additional_kwargs['tail_lines'] = tail_lines
+
try:
return self._client.read_namespaced_pod_log(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
container='base',
follow=True,
- tail_lines=tail_lines,
- _preload_content=False
+ timestamps=timestamps,
+ _preload_content=False,
+ **additional_kwargs
)
except BaseHTTPError as e:
raise AirflowException(
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py
index f7184e5..682eada 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -414,7 +414,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
)
context = create_context(k)
k.execute(context=context)
- mock_logger.info.assert_any_call(b"retrieved from mount\n")
+ mock_logger.info.assert_any_call('retrieved from mount')
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod['spec']['containers'][0]['args'] = args
self.expected_pod['spec']['containers'][0]['volumeMounts'] = [{
diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py
index be3812b..9c35f4b 100644
--- a/tests/kubernetes/test_pod_launcher.py
+++ b/tests/kubernetes/test_pod_launcher.py
@@ -48,17 +48,17 @@ class TestPodLauncher(unittest.TestCase):
_preload_content=False,
container='base',
follow=True,
+ timestamps=False,
name=mock.sentinel.metadata.name,
- namespace=mock.sentinel.metadata.namespace,
- tail_lines=10
+ namespace=mock.sentinel.metadata.namespace
),
mock.call(
_preload_content=False,
container='base',
follow=True,
+ timestamps=False,
name=mock.sentinel.metadata.name,
- namespace=mock.sentinel.metadata.namespace,
- tail_lines=10
+ namespace=mock.sentinel.metadata.namespace
)
])
@@ -80,19 +80,39 @@ class TestPodLauncher(unittest.TestCase):
self.mock_kube_client.read_namespaced_pod_log.side_effect = [
mock.sentinel.logs
]
- logs = self.pod_launcher.read_pod_logs(mock.sentinel, 100)
+ logs = self.pod_launcher.read_pod_logs(mock.sentinel, tail_lines=100)
self.assertEqual(mock.sentinel.logs, logs)
self.mock_kube_client.read_namespaced_pod_log.assert_has_calls([
mock.call(
_preload_content=False,
container='base',
follow=True,
+ timestamps=False,
name=mock.sentinel.metadata.name,
namespace=mock.sentinel.metadata.namespace,
tail_lines=100
),
])
+ def test_read_pod_logs_successfully_with_since_seconds(self):
+ mock.sentinel.metadata = mock.MagicMock()
+ self.mock_kube_client.read_namespaced_pod_log.side_effect = [
+ mock.sentinel.logs
+ ]
+ logs = self.pod_launcher.read_pod_logs(mock.sentinel, since_seconds=2)
+ self.assertEqual(mock.sentinel.logs, logs)
+ self.mock_kube_client.read_namespaced_pod_log.assert_has_calls([
+ mock.call(
+ _preload_content=False,
+ container='base',
+ follow=True,
+ timestamps=False,
+ name=mock.sentinel.metadata.name,
+ namespace=mock.sentinel.metadata.namespace,
+ since_seconds=2
+ ),
+ ])
+
def test_read_pod_events_successfully_returns_events(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.list_namespaced_event.return_value = mock.sentinel.events
@@ -162,3 +182,15 @@ class TestPodLauncher(unittest.TestCase):
self.pod_launcher.read_pod,
mock.sentinel
)
+
+ def test_parse_log_line(self):
+ timestamp, message = \
+ self.pod_launcher.parse_log_line('2020-10-08T14:16:17.793417674Z Valid message\n')
+
+ self.assertEqual(timestamp, '2020-10-08T14:16:17.793417674Z')
+ self.assertEqual(message, 'Valid message')
+
+ self.assertRaises(
+ Exception,
+ self.pod_launcher.parse_log_line('2020-10-08T14:16:17.793417674ZInvalid message\n'),
+ )