You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/04/06 13:59:00 UTC

[jira] [Commented] (AIRFLOW-4526) KubernetesPodOperator gets stuck in Running state when get_logs is set to True and there is a long gap without logs from pod

    [ https://issues.apache.org/jira/browse/AIRFLOW-4526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315583#comment-17315583 ] 

ASF GitHub Bot commented on AIRFLOW-4526:
-----------------------------------------

nielstenboom commented on pull request #7428:
URL: https://github.com/apache/airflow/pull/7428#issuecomment-814143048


   > hi @smaley07,
   > 
   > You can use the code provided in this PR to make a custom pod launcher and operator. Then add the files to your plugin directory so that you can use the operator in your dags
   > 
   > for the pod operator:
   > 
   > ```
   > from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
   > from airflow.exceptions import AirflowException
   > from airflow.contrib.kubernetes import kube_client, pod_generator
   > from utils.custom_pod_launcher import CustomPodLauncher
   > from airflow.utils.state import State
   > from airflow.version import version as airflow_version
   > 
   > 
   > class CustomPodOperator(KubernetesPodOperator):
   > 
   >     def execute(self, context):
   >         try:
   >             if self.in_cluster is not None:
   >                 client = kube_client.get_kube_client(in_cluster=self.in_cluster,
   >                                                      cluster_context=self.cluster_context,
   >                                                      config_file=self.config_file)
   >             else:
   >                 client = kube_client.get_kube_client(cluster_context=self.cluster_context,
   >                                                      config_file=self.config_file)
   > 
   >             # Add Airflow Version to the label
   >             # And a label to identify that pod is launched by KubernetesPodOperator
   >             self.labels.update(
   >                 {
   >                     'airflow_version': airflow_version.replace('+', '-'),
   >                     'kubernetes_pod_operator': 'True',
   >                 }
   >             )
   > 
   >             gen = pod_generator.PodGenerator()
   > 
   >             for port in self.ports:
   >                 gen.add_port(port)
   >             for mount in self.volume_mounts:
   >                 gen.add_mount(mount)
   >             for volume in self.volumes:
   >                 gen.add_volume(volume)
   > 
   >             pod = gen.make_pod(
   >                 namespace=self.namespace,
   >                 image=self.image,
   >                 pod_id=self.name,
   >                 cmds=self.cmds,
   >                 arguments=self.arguments,
   >                 labels=self.labels,
   >             )
   > 
   >             pod.service_account_name = self.service_account_name
   >             pod.secrets = self.secrets
   >             pod.envs = self.env_vars
   >             pod.image_pull_policy = self.image_pull_policy
   >             pod.image_pull_secrets = self.image_pull_secrets
   >             pod.annotations = self.annotations
   >             pod.resources = self.resources
   >             pod.affinity = self.affinity
   >             pod.node_selectors = self.node_selectors
   >             pod.hostnetwork = self.hostnetwork
   >             pod.tolerations = self.tolerations
   >             pod.configmaps = self.configmaps
   >             pod.security_context = self.security_context
   >             pod.pod_runtime_info_envs = self.pod_runtime_info_envs
   >             pod.dnspolicy = self.dnspolicy
   > 
   >             launcher = CustomPodLauncher(kube_client=client,
   >                                                 extract_xcom=self.do_xcom_push)
   >             try:
   >                 (final_state, result) = launcher.run_pod(
   >                     pod,
   >                     startup_timeout=self.startup_timeout_seconds,
   >                     get_logs=self.get_logs)
   >             finally:
   >                 if self.is_delete_operator_pod:
   >                     launcher.delete_pod(pod)
   > 
   >             if final_state != State.SUCCESS:
   >                 raise AirflowException(
   >                     'Pod returned a failure: {state}'.format(state=final_state)
   >                 )
   >             if self.do_xcom_push:
   >                 return result
   >         except AirflowException as ex:
   >             raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
   > ```
   > 
   > and for the launcher:
   > 
   > ```
   > from airflow.contrib.kubernetes.pod_launcher import PodLauncher
   > from requests.exceptions import BaseHTTPError
   > from airflow import AirflowException
   > from typing import Generator, List
   > from kubernetes.client.models.v1_pod import V1Pod
   > import math
   > import time
   > from datetime import datetime as dt
   > 
   > POD_LOGS_POLL_INTERVAL_SECONDS = 5
   > 
   > class CustomPodLauncher(PodLauncher):
   > 
   >     def _request_pod_log_chunk(self, pod: V1Pod, since_seconds: int) -> str:
   >         return self._client.read_namespaced_pod_log(
   >             name=pod.name,
   >             namespace=pod.namespace,
   >             container='base',
   >             follow=False,
   >             since_seconds=since_seconds,
   >             timestamps=True,
   >             _preload_content=False
   >         )
   > 
   >     def _read_pod_log_chunk(self, pod: V1Pod, last_line: bytes) -> Generator[bytes, None, None]:
   >         # The CoreV1Api doesn't support since_time even though the API does, so we must use
   >         # since_seconds. Add 15 seconds of buffer just in case of NTP woes
   >         if last_line:
   >             # Strip fractional part because strptime doesn't support nanosecond parsing
   >             timestamp = last_line.split(b" ", 1)[0]
   >             last_chunk_dt = dt.strptime(timestamp.split(b".", 1)[0].decode("utf-8"),
   >                                         "%Y-%m-%dT%H:%M:%S")
   >             since_time = last_chunk_dt
   >         else:
   >             since_time = dt.utcfromtimestamp(0)
   >         since_seconds = math.ceil((dt.utcnow() - since_time).total_seconds() + 15)
   >         resp = self._request_pod_log_chunk(pod, since_seconds)
   >         # If we've already read a chunk, skip until we find a matching line
   >         # Just in case since_seconds doesn't get everything we want, keep the previous lines in a buffer
   >         buffered_lines = []  # type: List[bytes]
   >         skipping_lines = True
   >         for line in resp:
   >             if skipping_lines:
   >                 if line == last_line:
   >                     self.log.debug("Found duplicate line. Stopping log skipping")
   >                     buffered_lines = []
   >                     skipping_lines = False
   >                 else:
   >                     buffered_lines.append(line)
   >             else:
   >                 yield line
   > 
   >         if buffered_lines:
   >             self.log.warning(
   >                 "End of previous log chunk not found in next chunk. May indicated log line loss"
   >             )
   >             for buffered_line in buffered_lines:
   >                 yield buffered_line
   > 
   >     def read_pod_logs(self, pod: V1Pod) -> Generator[bytes, None, None]:
   >         """
   >         Reads pod logs from the Kubernetes API until the pod stops.
   >         This explicitly does not use the `follow` parameter due to issues
   >         around log rotation
   >         (https://github.com/kubernetes/kubernetes/issues/28369). Once that is
   >         fixed, using follow instead of polling for pod status should be fine,
   >         but deduping on timestamp will still be desired in case the underlying
   >         request fails
   >         :param pod:
   >         :return:
   >         """
   > 
   >         # The timestamps returned from the Kubernetes API are in nanoseconds, and appear
   >         # to never duplicate across lines so we can use the timestamp plus the line
   >         # content to deduplicate log lines across multiple runs
   >         last_line = b""
   >         # We use a variable here instead of looping on self.pod_is_running so
   >         # that we can get one more read in the loop before breaking out
   >         pod_is_running = True
   > 
   >         try:
   >             while pod_is_running:
   >                 pod_is_running = self.base_container_is_running(pod)
   >                 if not pod_is_running:
   >                     self.log.info("pod stopped, pulling logs one more time")
   > 
   >                 for line in self._read_pod_log_chunk(pod, last_line):
   >                     timestamp, log_line = line.split(b" ", 1)
   >                     yield log_line
   >                     last_line = line
   > 
   >                 time.sleep(POD_LOGS_POLL_INTERVAL_SECONDS)
   >         except BaseHTTPError as e:
   >             raise AirflowException(
   >                 'There was an error reading the kubernetes API: {}'.format(e)
   >             )
   > ```
   
   Wow thank you so much!! This fixed the bug that has been bugging us for weeks.
   
   I've had to make a few small adjustments for it to work with airflow 1.10.15, for those who also stumble upon this thread and are interested (I put both classes in one file):
   
   operator: 
   ```python
   
   # operator imports
   from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
   from airflow.exceptions import AirflowException
   from airflow.kubernetes import kube_client, pod_generator
   from airflow.utils.state import State
   from airflow.version import version as airflow_version
   
   # launcher imports
   from airflow.kubernetes.pod_launcher import PodLauncher
   from requests.exceptions import BaseHTTPError
   from airflow import AirflowException
   from typing import Generator, List
   from kubernetes.client.models.v1_pod import V1Pod
   import math
   import time
   from datetime import datetime as dt
   
   class CustomPodOperator(KubernetesPodOperator):
   
       def execute(self, context):
           try:
               if self.in_cluster is not None:
                   client = kube_client.get_kube_client(in_cluster=self.in_cluster,
                                                        cluster_context=self.cluster_context,
                                                        config_file=self.config_file)
               else:
                   client = kube_client.get_kube_client(cluster_context=self.cluster_context,
                                                        config_file=self.config_file)
   
               self.pod = self.create_pod_request_obj()
               self.namespace = self.pod.metadata.namespace
   
               self.client = client
   
               # Add combination of labels to uniquely identify a running pod
               labels = self.create_labels_for_pod(context)
   
               self.pod = self.create_pod_request_obj()
               self.namespace = self.pod.metadata.namespace
   
               label_selector = self._get_pod_identifying_label_string(labels)
   
               pod_list = client.list_namespaced_pod(self.namespace, label_selector=label_selector)
   
               if len(pod_list.items) > 1 and self.reattach_on_restart:
                   raise AirflowException(
                       'More than one pod running with labels: '
                       '{label_selector}'.format(label_selector=label_selector))
   
               launcher = CustomPodLauncher(kube_client=client, extract_xcom=self.do_xcom_push)
   
               if len(pod_list.items) == 1:
                   try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
                   final_state, result = self.handle_pod_overlap(
                       labels, try_numbers_match, launcher, pod_list.items[0]
                   )
               else:
                   final_state, _, result = self.create_new_pod_for_operator(labels, launcher)
               if final_state != State.SUCCESS:
                   raise AirflowException(
                       'Pod returned a failure: {state}'.format(state=final_state))
               return result
           except AirflowException as ex:
               raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
   
   """
   Custom pod launcher
   """
   
   POD_LOGS_POLL_INTERVAL_SECONDS = 5
   
   class CustomPodLauncher(PodLauncher):
   
       def _request_pod_log_chunk(self, pod: V1Pod, since_seconds: int) -> str:
           return self._client.read_namespaced_pod_log(
               name=pod.metadata.name,
               namespace=pod.metadata.namespace,
               container='base',
               follow=False,
               since_seconds=since_seconds,
               timestamps=True,
               _preload_content=False
           )
   
       def _read_pod_log_chunk(self, pod: V1Pod, last_line: bytes) -> Generator[bytes, None, None]:
           # The CoreV1Api doesn't support since_time even though the API does, so we must use
           # since_seconds. Add 15 seconds of buffer just in case of NTP woes
           if last_line:
               # Strip fractional part because strptime doesn't support nanosecond parsing
               timestamp = last_line.split(b" ", 1)[0]
               last_chunk_dt = dt.strptime(timestamp.split(b".", 1)[0].decode("utf-8"),
                                           "%Y-%m-%dT%H:%M:%S")
               since_time = last_chunk_dt
           else:
               since_time = dt.utcfromtimestamp(0)
           since_seconds = math.ceil((dt.utcnow() - since_time).total_seconds() + 15)
           resp = self._request_pod_log_chunk(pod, since_seconds)
           # If we've already read a chunk, skip until we find a matching line
           # Just in case since_seconds doesn't get everything we want, keep the previous lines in a buffer
           buffered_lines = []  # type: List[bytes]
           skipping_lines = True
           for line in resp:
               if skipping_lines:
                   if line == last_line:
                       self.log.debug("Found duplicate line. Stopping log skipping")
                       buffered_lines = []
                       skipping_lines = False
                   else:
                       buffered_lines.append(line)
               else:
                   yield line
   
           if buffered_lines:
               self.log.warning(
                   "End of previous log chunk not found in next chunk. May indicated log line loss"
               )
               for buffered_line in buffered_lines:
                   yield buffered_line
   
       def read_pod_logs(self, pod: V1Pod) -> Generator[bytes, None, None]:
           """
           Reads pod logs from the Kubernetes API until the pod stops.
           This explicitly does not use the `follow` parameter due to issues
           around log rotation
           (https://github.com/kubernetes/kubernetes/issues/28369). Once that is
           fixed, using follow instead of polling for pod status should be fine,
           but deduping on timestamp will still be desired in case the underlying
           request fails
           :param pod:
           :return:
           """
   
           # The timestamps returned from the Kubernetes API are in nanoseconds, and appear
           # to never duplicate across lines so we can use the timestamp plus the line
           # content to deduplicate log lines across multiple runs
           last_line = b""
           # We use a variable here instead of looping on self.pod_is_running so
           # that we can get one more read in the loop before breaking out
           pod_is_running = True
   
           try:
               while pod_is_running:
                   pod_is_running = self.base_container_is_running(pod)
                   if not pod_is_running:
                       self.log.info("pod stopped, pulling logs one more time")
   
                   for line in self._read_pod_log_chunk(pod, last_line):
                       timestamp, log_line = line.split(b" ", 1)
                       yield log_line
                       last_line = line
   
                   time.sleep(POD_LOGS_POLL_INTERVAL_SECONDS)
           except BaseHTTPError as e:
               raise AirflowException(
                   'There was an error reading the kubernetes API: {}'.format(e)
               )
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> KubernetesPodOperator gets stuck in Running state when get_logs is set to True and there is a long gap without logs from pod
> ----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-4526
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4526
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: operators
>         Environment: Azure Kubernetes Service cluster with Airflow based on puckel/docker-airflow
>            Reporter: Christian Lellmann
>            Priority: Major
>              Labels: kubernetes
>             Fix For: 2.0.0
>
>
> When setting the `get_logs` parameter in the KubernetesPodOperator to True the Operator task get stuck in the Running state if the pod that is run by the task (in_cluster mode) writes some logs and then stops writing logs for a longer time (few minutes) before continuing writing. The continued logging isn't fetched anymore and the pod states aren't checked anymore. So, the completion of the pod isn't recognized and the tasks never finishes.
>  
> Assumption:
> In the `monitor_pod` method of the pod launcher ([https://github.com/apache/airflow/blob/master/airflow/kubernetes/pod_launcher.py#L97]) the `read_namespaced_pod_log` method of the kubernetes client get stuck in the `Follow=True` stream ([https://github.com/apache/airflow/blob/master/airflow/kubernetes/pod_launcher.py#L108]) because if there is a time without logs from the pod the method doesn't forward the following logs anymore, probably.
> So, the `pod_launcher` doesn't check the pod states later anymore ([https://github.com/apache/airflow/blob/master/airflow/kubernetes/pod_launcher.py#L118]) and doesn't recognize the complete state -> the task sticks in Running.
> When disabling the `get_logs` parameter everything works because the log stream is skipped.
>  
> Suggestion:
> Poll the logs actively without the `Follow` parameter set to True in parallel with the pod state checking.
> So, it's possible to fetch the logs without the described connection problem and coincidently check the pod state to be definetly able to recognize the end states of the pods.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)