You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/06 13:59:07 UTC

[GitHub] [airflow] nielstenboom edited a comment on pull request #7428: [AIRFLOW-4526] Poll k8s logs to avoid problem with kubernetes logs --follow

nielstenboom edited a comment on pull request #7428:
URL: https://github.com/apache/airflow/pull/7428#issuecomment-814143048


   > hi @smaley07,
   > 
   > You can use the code provided in this PR to make a custom pod launcher and operator. Then add the files to your plugin directory so that you can use the operator in your dags
   > 
   > for the pod operator:
   > 
   > ```
   > from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
   > from airflow.exceptions import AirflowException
   > from airflow.contrib.kubernetes import kube_client, pod_generator
   > from utils.custom_pod_launcher import CustomPodLauncher
   > from airflow.utils.state import State
   > from airflow.version import version as airflow_version
   > 
   > 
   > class CustomPodOperator(KubernetesPodOperator):
   > 
   >     def execute(self, context):
   >         try:
   >             if self.in_cluster is not None:
   >                 client = kube_client.get_kube_client(in_cluster=self.in_cluster,
   >                                                      cluster_context=self.cluster_context,
   >                                                      config_file=self.config_file)
   >             else:
   >                 client = kube_client.get_kube_client(cluster_context=self.cluster_context,
   >                                                      config_file=self.config_file)
   > 
   >             # Add Airflow Version to the label
   >             # And a label to identify that pod is launched by KubernetesPodOperator
   >             self.labels.update(
   >                 {
   >                     'airflow_version': airflow_version.replace('+', '-'),
   >                     'kubernetes_pod_operator': 'True',
   >                 }
   >             )
   > 
   >             gen = pod_generator.PodGenerator()
   > 
   >             for port in self.ports:
   >                 gen.add_port(port)
   >             for mount in self.volume_mounts:
   >                 gen.add_mount(mount)
   >             for volume in self.volumes:
   >                 gen.add_volume(volume)
   > 
   >             pod = gen.make_pod(
   >                 namespace=self.namespace,
   >                 image=self.image,
   >                 pod_id=self.name,
   >                 cmds=self.cmds,
   >                 arguments=self.arguments,
   >                 labels=self.labels,
   >             )
   > 
   >             pod.service_account_name = self.service_account_name
   >             pod.secrets = self.secrets
   >             pod.envs = self.env_vars
   >             pod.image_pull_policy = self.image_pull_policy
   >             pod.image_pull_secrets = self.image_pull_secrets
   >             pod.annotations = self.annotations
   >             pod.resources = self.resources
   >             pod.affinity = self.affinity
   >             pod.node_selectors = self.node_selectors
   >             pod.hostnetwork = self.hostnetwork
   >             pod.tolerations = self.tolerations
   >             pod.configmaps = self.configmaps
   >             pod.security_context = self.security_context
   >             pod.pod_runtime_info_envs = self.pod_runtime_info_envs
   >             pod.dnspolicy = self.dnspolicy
   > 
   >             launcher = CustomPodLauncher(kube_client=client,
   >                                                 extract_xcom=self.do_xcom_push)
   >             try:
   >                 (final_state, result) = launcher.run_pod(
   >                     pod,
   >                     startup_timeout=self.startup_timeout_seconds,
   >                     get_logs=self.get_logs)
   >             finally:
   >                 if self.is_delete_operator_pod:
   >                     launcher.delete_pod(pod)
   > 
   >             if final_state != State.SUCCESS:
   >                 raise AirflowException(
   >                     'Pod returned a failure: {state}'.format(state=final_state)
   >                 )
   >             if self.do_xcom_push:
   >                 return result
   >         except AirflowException as ex:
   >             raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
   > ```
   > 
   > and for the launcher:
   > 
   > ```
   > from airflow.contrib.kubernetes.pod_launcher import PodLauncher
   > from requests.exceptions import BaseHTTPError
   > from airflow import AirflowException
   > from typing import Generator, List
   > from kubernetes.client.models.v1_pod import V1Pod
   > import math
   > import time
   > from datetime import datetime as dt
   > 
   > POD_LOGS_POLL_INTERVAL_SECONDS = 5
   > 
   > class CustomPodLauncher(PodLauncher):
   > 
   >     def _request_pod_log_chunk(self, pod: V1Pod, since_seconds: int) -> str:
   >         return self._client.read_namespaced_pod_log(
   >             name=pod.name,
   >             namespace=pod.namespace,
   >             container='base',
   >             follow=False,
   >             since_seconds=since_seconds,
   >             timestamps=True,
   >             _preload_content=False
   >         )
   > 
   >     def _read_pod_log_chunk(self, pod: V1Pod, last_line: bytes) -> Generator[bytes, None, None]:
   >         # The CoreV1Api doesn't support since_time even though the API does, so we must use
   >         # since_seconds. Add 15 seconds of buffer just in case of NTP woes
   >         if last_line:
   >             # Strip fractional part because strptime doesn't support nanosecond parsing
   >             timestamp = last_line.split(b" ", 1)[0]
   >             last_chunk_dt = dt.strptime(timestamp.split(b".", 1)[0].decode("utf-8"),
   >                                         "%Y-%m-%dT%H:%M:%S")
   >             since_time = last_chunk_dt
   >         else:
   >             since_time = dt.utcfromtimestamp(0)
   >         since_seconds = math.ceil((dt.utcnow() - since_time).total_seconds() + 15)
   >         resp = self._request_pod_log_chunk(pod, since_seconds)
   >         # If we've already read a chunk, skip until we find a matching line
   >         # Just in case since_seconds doesn't get everything we want, keep the previous lines in a buffer
   >         buffered_lines = []  # type: List[bytes]
   >         skipping_lines = True
   >         for line in resp:
   >             if skipping_lines:
   >                 if line == last_line:
   >                     self.log.debug("Found duplicate line. Stopping log skipping")
   >                     buffered_lines = []
   >                     skipping_lines = False
   >                 else:
   >                     buffered_lines.append(line)
   >             else:
   >                 yield line
   > 
   >         if buffered_lines:
   >             self.log.warning(
   >                 "End of previous log chunk not found in next chunk. May indicated log line loss"
   >             )
   >             for buffered_line in buffered_lines:
   >                 yield buffered_line
   > 
   >     def read_pod_logs(self, pod: V1Pod) -> Generator[bytes, None, None]:
   >         """
   >         Reads pod logs from the Kubernetes API until the pod stops.
   >         This explicitly does not use the `follow` parameter due to issues
   >         around log rotation
   >         (https://github.com/kubernetes/kubernetes/issues/28369). Once that is
   >         fixed, using follow instead of polling for pod status should be fine,
   >         but deduping on timestamp will still be desired in case the underlying
   >         request fails
   >         :param pod:
   >         :return:
   >         """
   > 
   >         # The timestamps returned from the Kubernetes API are in nanoseconds, and appear
   >         # to never duplicate across lines so we can use the timestamp plus the line
   >         # content to deduplicate log lines across multiple runs
   >         last_line = b""
   >         # We use a variable here instead of looping on self.pod_is_running so
   >         # that we can get one more read in the loop before breaking out
   >         pod_is_running = True
   > 
   >         try:
   >             while pod_is_running:
   >                 pod_is_running = self.base_container_is_running(pod)
   >                 if not pod_is_running:
   >                     self.log.info("pod stopped, pulling logs one more time")
   > 
   >                 for line in self._read_pod_log_chunk(pod, last_line):
   >                     timestamp, log_line = line.split(b" ", 1)
   >                     yield log_line
   >                     last_line = line
   > 
   >                 time.sleep(POD_LOGS_POLL_INTERVAL_SECONDS)
   >         except BaseHTTPError as e:
   >             raise AirflowException(
   >                 'There was an error reading the kubernetes API: {}'.format(e)
   >             )
   > ```
   
   Wow thank you so much!! This fixed the bug that has been bugging us for weeks.
   
   I've had to make a few small adjustments for it to work with airflow 1.10.15, for those who also stumble upon this thread and are interested (I put both classes in one file):
   
   payched_pod_operator.py: 
   ```python
   
   # operator imports
   from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
   from airflow.exceptions import AirflowException
   from airflow.kubernetes import kube_client, pod_generator
   from airflow.utils.state import State
   from airflow.version import version as airflow_version
   
   # launcher imports
   from airflow.kubernetes.pod_launcher import PodLauncher
   from requests.exceptions import BaseHTTPError
   from airflow import AirflowException
   from typing import Generator, List
   from kubernetes.client.models.v1_pod import V1Pod
   import math
   import time
   from datetime import datetime as dt
   
   class CustomPodOperator(KubernetesPodOperator):
   
       def execute(self, context):
           try:
               if self.in_cluster is not None:
                   client = kube_client.get_kube_client(in_cluster=self.in_cluster,
                                                        cluster_context=self.cluster_context,
                                                        config_file=self.config_file)
               else:
                   client = kube_client.get_kube_client(cluster_context=self.cluster_context,
                                                        config_file=self.config_file)
   
               self.pod = self.create_pod_request_obj()
               self.namespace = self.pod.metadata.namespace
   
               self.client = client
   
               # Add combination of labels to uniquely identify a running pod
               labels = self.create_labels_for_pod(context)
   
               self.pod = self.create_pod_request_obj()
               self.namespace = self.pod.metadata.namespace
   
               label_selector = self._get_pod_identifying_label_string(labels)
   
               pod_list = client.list_namespaced_pod(self.namespace, label_selector=label_selector)
   
               if len(pod_list.items) > 1 and self.reattach_on_restart:
                   raise AirflowException(
                       'More than one pod running with labels: '
                       '{label_selector}'.format(label_selector=label_selector))
   
               launcher = CustomPodLauncher(kube_client=client, extract_xcom=self.do_xcom_push)
   
               if len(pod_list.items) == 1:
                   try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
                   final_state, result = self.handle_pod_overlap(
                       labels, try_numbers_match, launcher, pod_list.items[0]
                   )
               else:
                   final_state, _, result = self.create_new_pod_for_operator(labels, launcher)
               if final_state != State.SUCCESS:
                   raise AirflowException(
                       'Pod returned a failure: {state}'.format(state=final_state))
               return result
           except AirflowException as ex:
               raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
   
   """
   Custom pod launcher
   """
   
   POD_LOGS_POLL_INTERVAL_SECONDS = 5
   
   class CustomPodLauncher(PodLauncher):
   
       def _request_pod_log_chunk(self, pod: V1Pod, since_seconds: int) -> str:
           return self._client.read_namespaced_pod_log(
               name=pod.metadata.name,
               namespace=pod.metadata.namespace,
               container='base',
               follow=False,
               since_seconds=since_seconds,
               timestamps=True,
               _preload_content=False
           )
   
       def _read_pod_log_chunk(self, pod: V1Pod, last_line: bytes) -> Generator[bytes, None, None]:
           # The CoreV1Api doesn't support since_time even though the API does, so we must use
           # since_seconds. Add 15 seconds of buffer just in case of NTP woes
           if last_line:
               # Strip fractional part because strptime doesn't support nanosecond parsing
               timestamp = last_line.split(b" ", 1)[0]
               last_chunk_dt = dt.strptime(timestamp.split(b".", 1)[0].decode("utf-8"),
                                           "%Y-%m-%dT%H:%M:%S")
               since_time = last_chunk_dt
           else:
               since_time = dt.utcfromtimestamp(0)
           since_seconds = math.ceil((dt.utcnow() - since_time).total_seconds() + 15)
           resp = self._request_pod_log_chunk(pod, since_seconds)
           # If we've already read a chunk, skip until we find a matching line
           # Just in case since_seconds doesn't get everything we want, keep the previous lines in a buffer
           buffered_lines = []  # type: List[bytes]
           skipping_lines = True
           for line in resp:
               if skipping_lines:
                   if line == last_line:
                       self.log.debug("Found duplicate line. Stopping log skipping")
                       buffered_lines = []
                       skipping_lines = False
                   else:
                       buffered_lines.append(line)
               else:
                   yield line
   
           if buffered_lines:
               self.log.warning(
                   "End of previous log chunk not found in next chunk. May indicated log line loss"
               )
               for buffered_line in buffered_lines:
                   yield buffered_line
   
       def read_pod_logs(self, pod: V1Pod) -> Generator[bytes, None, None]:
           """
           Reads pod logs from the Kubernetes API until the pod stops.
           This explicitly does not use the `follow` parameter due to issues
           around log rotation
           (https://github.com/kubernetes/kubernetes/issues/28369). Once that is
           fixed, using follow instead of polling for pod status should be fine,
           but deduping on timestamp will still be desired in case the underlying
           request fails
           :param pod:
           :return:
           """
   
           # The timestamps returned from the Kubernetes API are in nanoseconds, and appear
           # to never duplicate across lines so we can use the timestamp plus the line
           # content to deduplicate log lines across multiple runs
           last_line = b""
           # We use a variable here instead of looping on self.pod_is_running so
           # that we can get one more read in the loop before breaking out
           pod_is_running = True
   
           try:
               while pod_is_running:
                   pod_is_running = self.base_container_is_running(pod)
                   if not pod_is_running:
                       self.log.info("pod stopped, pulling logs one more time")
   
                   for line in self._read_pod_log_chunk(pod, last_line):
                       timestamp, log_line = line.split(b" ", 1)
                       yield log_line
                       last_line = line
   
                   time.sleep(POD_LOGS_POLL_INTERVAL_SECONDS)
           except BaseHTTPError as e:
               raise AirflowException(
                   'There was an error reading the kubernetes API: {}'.format(e)
               )
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org