You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/10/28 15:31:23 UTC

[GitHub] [airflow] obrie commented on issue #11629: KubernetesExecutor. Pod log do not contains all data from task log file

obrie commented on issue #11629:
URL: https://github.com/apache/airflow/issues/11629#issuecomment-718013495


   I don't know that it's the appropriate solution, but for what it's worth this is the patch I've applied to my Airflow cluster to get both pod and task logs visible in running tasks:
   
   ```python
   from airflow.configuration import conf
   from airflow.utils.log.file_task_handler import FileTaskHandler
   import os
   original__read = FileTaskHandler._read
   
   
   def _read(self, ti, try_number, metadata=None):
       log_relative_path = self._render_filename(ti, try_number)
       location = os.path.join(self.local_base, log_relative_path)
   
       if not os.path.exists(location) and conf.get('core', 'executor') == 'KubernetesExecutor':
           log = '*** Trying to get logs from worker pod {} ***\n\n'\
                   .format(ti.hostname)
   
           try:
               from airflow.kubernetes.kube_client import get_kube_client
               from kubernetes.stream import stream as kubernetes_stream
   
               # Fix hostnames that exceeded max
               kube_client = get_kube_client()
               if len(ti.hostname) >= 63:
                   pods = kube_client.list_namespaced_pod(conf.get('kubernetes', 'namespace'))
                   matches = [
                       p.metadata.name for p in pods.items if p.metadata.name.startswith(ti.hostname)
                   ]
                   if len(matches) == 1 and len(matches[0]) > len(ti.hostname):
                       ti.hostname = matches[0]
   
               # Get Pod logs
               res = kube_client.read_namespaced_pod_log(
                   name=ti.hostname,
                   namespace=conf.get('kubernetes', 'namespace'),
                   container='base',
                   follow=False,
                   tail_lines=100,
                   _preload_content=False,
               )
               for line in res:
                   log += line.decode()
   
               # Get Task logs
               resp = kubernetes_stream(
                   kube_client.connect_get_namespaced_pod_exec,
                   ti.hostname,
                   conf.get('kubernetes', 'namespace'),
                   command=['cat', location],
                   stderr=True,
                   stdin=False,
                   stdout=True,
                   tty=False,
                   _preload_content=False,
               )
               try:
                   while resp.is_open():
                       resp.update(timeout=1)
                       if resp.peek_stdout():
                           log += resp.read_stdout()
                       if resp.peek_stderr():
                           log += resp.read_stderr()
               finally:
                   resp.close()
           except Exception as f:
               log += '*** Unable to fetch logs from worker pod {} ***\n{}\n\n'.format(
                   ti.hostname, str(f)
               )
   
           return log, {'end_of_log': True}
       else:
           return original__read(self, ti, try_number, metadata)
   
   
   FileTaskHandler._read = _read
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org