You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/18 07:37:19 UTC

[GitHub] [airflow] xinbinhuang opened a new pull request #15419: Subtract file log reader into separate functions

xinbinhuang opened a new pull request #15419:
URL: https://github.com/apache/airflow/pull/15419


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15419: Subtract file log reader into separate functions

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15419:
URL: https://github.com/apache/airflow/pull/15419#discussion_r615425791



##########
File path: airflow/utils/log/file_task_handler.py
##########
@@ -111,76 +112,16 @@ def _read(self, ti, try_number, metadata=None):  # pylint: disable=unused-argume
         log_relative_path = self._render_filename(ti, try_number)
         location = os.path.join(self.local_base, log_relative_path)
 
-        log = ""
-
+        executor = conf.get('core', 'executor')
         if os.path.exists(location):
-            try:
-                with open(location) as file:
-                    log += f"*** Reading local file: {location}\n"
-                    log += "".join(file.readlines())
-            except Exception as e:  # pylint: disable=broad-except
-                log = f"*** Failed to load local log file: {location}\n"
-                log += f"*** {str(e)}\n"
-        elif conf.get('core', 'executor') == 'KubernetesExecutor':  # pylint: disable=too-many-nested-blocks
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-
-                kube_client = get_kube_client()
-
-                if len(ti.hostname) >= 63:
-                    # Kubernetes takes the pod name and truncates it for the hostname. This truncated hostname
-                    # is returned for the fqdn to comply with the 63 character limit imposed by DNS standards
-                    # on any label of a FQDN.
-                    pod_list = kube_client.list_namespaced_pod(conf.get('kubernetes', 'namespace'))
-                    matches = [
-                        pod.metadata.name
-                        for pod in pod_list.items
-                        if pod.metadata.name.startswith(ti.hostname)
-                    ]
-                    if len(matches) == 1:
-                        if len(matches[0]) > len(ti.hostname):
-                            ti.hostname = matches[0]
-
-                log += '*** Trying to get logs (last 100 lines) from worker pod {} ***\n\n'.format(
-                    ti.hostname
-                )
-
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
-                    namespace=conf.get('kubernetes', 'namespace'),
-                    container='base',
-                    follow=False,
-                    tail_lines=100,
-                    _preload_content=False,
-                )
-
-                for line in res:
-                    log += line.decode()
-
-            except Exception as f:  # pylint: disable=broad-except
-                log += f'*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n'
+            log = read_local_log(location)
+        elif executor == executor_constants.KUBERNETES_EXECUTOR or (
+            executor == executor_constants.CELERY_KUBERNETES_EXECUTOR
+            and ti.queue == conf.get('celery_kubernetes_executor', 'kubernetes_queue')
+        ):
+            log = read_kubernetes_pod_log(ti)
         else:
-            url = os.path.join("http://{ti.hostname}:{worker_log_server_port}/log", log_relative_path).format(
-                ti=ti, worker_log_server_port=conf.get('celery', 'WORKER_LOG_SERVER_PORT')
-            )
-            log += f"*** Log file does not exist: {location}\n"
-            log += f"*** Fetching from: {url}\n"
-            try:
-                timeout = None  # No timeout
-                try:
-                    timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
-                except (AirflowConfigException, ValueError):
-                    pass
-
-                response = requests.get(url, timeout=timeout)
-                response.encoding = "utf-8"
-
-                # Check if the resource was properly fetched
-                response.raise_for_status()
-
-                log += '\n' + response.text
-            except Exception as e:  # pylint: disable=broad-except
-                log += f"*** Failed to fetch log file from worker. {str(e)}\n"
+            log = read_celery_worker_log(ti, location, log_relative_path)

Review comment:
       I didn't plan to add this originally. But after separating the logic out, this becomes more clear to me that we may need this. I haven't tested this yet (will do). But will also like to know if anyone using the `CELERY_KUBERNETES_EXECUTOR` already see this problem




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #15419: Subtract file log reader into separate functions

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #15419:
URL: https://github.com/apache/airflow/pull/15419


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #15419: Subtract file log reader into separate functions

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #15419:
URL: https://github.com/apache/airflow/pull/15419#discussion_r615927784



##########
File path: airflow/utils/log/file_task_handler.py
##########
@@ -261,3 +202,85 @@ def _init_file(self, ti):
                 logging.warning("OSError while change ownership of the log file")
 
         return full_path
+
+
+def read_local_log(location: str):

Review comment:
       This module has a mixed responsibility - It handles local files, remote files and k8s logs. I think we should divide it and move some part of the code to [`log_reader.py`](https://github.com/apache/airflow/blob/master/airflow/utils/log/log_reader.py), so that these functions can also be used by other handlers.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15419: Subtract file log reader into separate functions

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15419:
URL: https://github.com/apache/airflow/pull/15419#issuecomment-885968322


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15419: Subtract file log reader into separate functions

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15419:
URL: https://github.com/apache/airflow/pull/15419#discussion_r615357893



##########
File path: airflow/utils/log/file_task_handler.py
##########
@@ -111,76 +112,16 @@ def _read(self, ti, try_number, metadata=None):  # pylint: disable=unused-argume
         log_relative_path = self._render_filename(ti, try_number)
         location = os.path.join(self.local_base, log_relative_path)
 
-        log = ""
-
+        executor = conf.get('core', 'executor')
         if os.path.exists(location):
-            try:
-                with open(location) as file:
-                    log += f"*** Reading local file: {location}\n"
-                    log += "".join(file.readlines())
-            except Exception as e:  # pylint: disable=broad-except
-                log = f"*** Failed to load local log file: {location}\n"
-                log += f"*** {str(e)}\n"
-        elif conf.get('core', 'executor') == 'KubernetesExecutor':  # pylint: disable=too-many-nested-blocks
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-
-                kube_client = get_kube_client()
-
-                if len(ti.hostname) >= 63:
-                    # Kubernetes takes the pod name and truncates it for the hostname. This truncated hostname
-                    # is returned for the fqdn to comply with the 63 character limit imposed by DNS standards
-                    # on any label of a FQDN.
-                    pod_list = kube_client.list_namespaced_pod(conf.get('kubernetes', 'namespace'))
-                    matches = [
-                        pod.metadata.name
-                        for pod in pod_list.items
-                        if pod.metadata.name.startswith(ti.hostname)
-                    ]
-                    if len(matches) == 1:
-                        if len(matches[0]) > len(ti.hostname):
-                            ti.hostname = matches[0]
-
-                log += '*** Trying to get logs (last 100 lines) from worker pod {} ***\n\n'.format(
-                    ti.hostname
-                )
-
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
-                    namespace=conf.get('kubernetes', 'namespace'),
-                    container='base',
-                    follow=False,
-                    tail_lines=100,
-                    _preload_content=False,
-                )
-
-                for line in res:
-                    log += line.decode()
-
-            except Exception as f:  # pylint: disable=broad-except
-                log += f'*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n'
+            log = read_local_log(location)
+        elif executor == executor_constants.KUBERNETES_EXECUTOR or (
+            executor == executor_constants.CELERY_KUBERNETES_EXECUTOR
+            and ti.queue == conf.get('celery_kubernetes_executor', 'kubernetes_queue')

Review comment:
       I didn't plan to add this originally. But after separating the logic out, this becomes more clear to me that we need this. I haven't tested this yet (will do). But will also like to know if anyone using the `CELERY_KUBERNETES_EXECUTOR` already see this problem




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15419: Subtract file log reader into separate functions

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15419:
URL: https://github.com/apache/airflow/pull/15419#discussion_r615425791



##########
File path: airflow/utils/log/file_task_handler.py
##########
@@ -111,76 +112,16 @@ def _read(self, ti, try_number, metadata=None):  # pylint: disable=unused-argume
         log_relative_path = self._render_filename(ti, try_number)
         location = os.path.join(self.local_base, log_relative_path)
 
-        log = ""
-
+        executor = conf.get('core', 'executor')
         if os.path.exists(location):
-            try:
-                with open(location) as file:
-                    log += f"*** Reading local file: {location}\n"
-                    log += "".join(file.readlines())
-            except Exception as e:  # pylint: disable=broad-except
-                log = f"*** Failed to load local log file: {location}\n"
-                log += f"*** {str(e)}\n"
-        elif conf.get('core', 'executor') == 'KubernetesExecutor':  # pylint: disable=too-many-nested-blocks
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-
-                kube_client = get_kube_client()
-
-                if len(ti.hostname) >= 63:
-                    # Kubernetes takes the pod name and truncates it for the hostname. This truncated hostname
-                    # is returned for the fqdn to comply with the 63 character limit imposed by DNS standards
-                    # on any label of a FQDN.
-                    pod_list = kube_client.list_namespaced_pod(conf.get('kubernetes', 'namespace'))
-                    matches = [
-                        pod.metadata.name
-                        for pod in pod_list.items
-                        if pod.metadata.name.startswith(ti.hostname)
-                    ]
-                    if len(matches) == 1:
-                        if len(matches[0]) > len(ti.hostname):
-                            ti.hostname = matches[0]
-
-                log += '*** Trying to get logs (last 100 lines) from worker pod {} ***\n\n'.format(
-                    ti.hostname
-                )
-
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
-                    namespace=conf.get('kubernetes', 'namespace'),
-                    container='base',
-                    follow=False,
-                    tail_lines=100,
-                    _preload_content=False,
-                )
-
-                for line in res:
-                    log += line.decode()
-
-            except Exception as f:  # pylint: disable=broad-except
-                log += f'*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n'
+            log = read_local_log(location)
+        elif executor == executor_constants.KUBERNETES_EXECUTOR or (
+            executor == executor_constants.CELERY_KUBERNETES_EXECUTOR
+            and ti.queue == conf.get('celery_kubernetes_executor', 'kubernetes_queue')
+        ):
+            log = read_kubernetes_pod_log(ti)
         else:
-            url = os.path.join("http://{ti.hostname}:{worker_log_server_port}/log", log_relative_path).format(
-                ti=ti, worker_log_server_port=conf.get('celery', 'WORKER_LOG_SERVER_PORT')
-            )
-            log += f"*** Log file does not exist: {location}\n"
-            log += f"*** Fetching from: {url}\n"
-            try:
-                timeout = None  # No timeout
-                try:
-                    timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
-                except (AirflowConfigException, ValueError):
-                    pass
-
-                response = requests.get(url, timeout=timeout)
-                response.encoding = "utf-8"
-
-                # Check if the resource was properly fetched
-                response.raise_for_status()
-
-                log += '\n' + response.text
-            except Exception as e:  # pylint: disable=broad-except
-                log += f"*** Failed to fetch log file from worker. {str(e)}\n"
+            log = read_celery_worker_log(ti, location, log_relative_path)

Review comment:
       The original behavior seems to always read logs from the celery worker node for the  `CELERY_KUBERNETES_EXECUTOR`, so the extra check will allow the webserver to grab log from k8s pod when the TI is running on the Kubernetes queue. 
   
   I am not 100% sure about this as I never used the `CELERY_KUBERNETES_EXECUTOR` before, and I will need to test it a bit. Meanwhile, I wonder if anyone using the `CELERY_KUBERNETES_EXECUTOR` already see this problem




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15419: Subtract file log reader into separate functions

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15419:
URL: https://github.com/apache/airflow/pull/15419#discussion_r615357893



##########
File path: airflow/utils/log/file_task_handler.py
##########
@@ -111,76 +112,16 @@ def _read(self, ti, try_number, metadata=None):  # pylint: disable=unused-argume
         log_relative_path = self._render_filename(ti, try_number)
         location = os.path.join(self.local_base, log_relative_path)
 
-        log = ""
-
+        executor = conf.get('core', 'executor')
         if os.path.exists(location):
-            try:
-                with open(location) as file:
-                    log += f"*** Reading local file: {location}\n"
-                    log += "".join(file.readlines())
-            except Exception as e:  # pylint: disable=broad-except
-                log = f"*** Failed to load local log file: {location}\n"
-                log += f"*** {str(e)}\n"
-        elif conf.get('core', 'executor') == 'KubernetesExecutor':  # pylint: disable=too-many-nested-blocks
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-
-                kube_client = get_kube_client()
-
-                if len(ti.hostname) >= 63:
-                    # Kubernetes takes the pod name and truncates it for the hostname. This truncated hostname
-                    # is returned for the fqdn to comply with the 63 character limit imposed by DNS standards
-                    # on any label of a FQDN.
-                    pod_list = kube_client.list_namespaced_pod(conf.get('kubernetes', 'namespace'))
-                    matches = [
-                        pod.metadata.name
-                        for pod in pod_list.items
-                        if pod.metadata.name.startswith(ti.hostname)
-                    ]
-                    if len(matches) == 1:
-                        if len(matches[0]) > len(ti.hostname):
-                            ti.hostname = matches[0]
-
-                log += '*** Trying to get logs (last 100 lines) from worker pod {} ***\n\n'.format(
-                    ti.hostname
-                )
-
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
-                    namespace=conf.get('kubernetes', 'namespace'),
-                    container='base',
-                    follow=False,
-                    tail_lines=100,
-                    _preload_content=False,
-                )
-
-                for line in res:
-                    log += line.decode()
-
-            except Exception as f:  # pylint: disable=broad-except
-                log += f'*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n'
+            log = read_local_log(location)
+        elif executor == executor_constants.KUBERNETES_EXECUTOR or (
+            executor == executor_constants.CELERY_KUBERNETES_EXECUTOR
+            and ti.queue == conf.get('celery_kubernetes_executor', 'kubernetes_queue')

Review comment:
       I didn't plan to add this originally. But after separating the logic out, this becomes more clear to me that we may need this. I haven't tested this yet (will do). But will also like to know if anyone using the `CELERY_KUBERNETES_EXECUTOR` already see this problem




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15419: Subtract file log reader into separate functions

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15419:
URL: https://github.com/apache/airflow/pull/15419#discussion_r615357893



##########
File path: airflow/utils/log/file_task_handler.py
##########
@@ -111,76 +112,16 @@ def _read(self, ti, try_number, metadata=None):  # pylint: disable=unused-argume
         log_relative_path = self._render_filename(ti, try_number)
         location = os.path.join(self.local_base, log_relative_path)
 
-        log = ""
-
+        executor = conf.get('core', 'executor')
         if os.path.exists(location):
-            try:
-                with open(location) as file:
-                    log += f"*** Reading local file: {location}\n"
-                    log += "".join(file.readlines())
-            except Exception as e:  # pylint: disable=broad-except
-                log = f"*** Failed to load local log file: {location}\n"
-                log += f"*** {str(e)}\n"
-        elif conf.get('core', 'executor') == 'KubernetesExecutor':  # pylint: disable=too-many-nested-blocks
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-
-                kube_client = get_kube_client()
-
-                if len(ti.hostname) >= 63:
-                    # Kubernetes takes the pod name and truncates it for the hostname. This truncated hostname
-                    # is returned for the fqdn to comply with the 63 character limit imposed by DNS standards
-                    # on any label of a FQDN.
-                    pod_list = kube_client.list_namespaced_pod(conf.get('kubernetes', 'namespace'))
-                    matches = [
-                        pod.metadata.name
-                        for pod in pod_list.items
-                        if pod.metadata.name.startswith(ti.hostname)
-                    ]
-                    if len(matches) == 1:
-                        if len(matches[0]) > len(ti.hostname):
-                            ti.hostname = matches[0]
-
-                log += '*** Trying to get logs (last 100 lines) from worker pod {} ***\n\n'.format(
-                    ti.hostname
-                )
-
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
-                    namespace=conf.get('kubernetes', 'namespace'),
-                    container='base',
-                    follow=False,
-                    tail_lines=100,
-                    _preload_content=False,
-                )
-
-                for line in res:
-                    log += line.decode()
-
-            except Exception as f:  # pylint: disable=broad-except
-                log += f'*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n'
+            log = read_local_log(location)
+        elif executor == executor_constants.KUBERNETES_EXECUTOR or (
+            executor == executor_constants.CELERY_KUBERNETES_EXECUTOR
+            and ti.queue == conf.get('celery_kubernetes_executor', 'kubernetes_queue')

Review comment:
       I didn't plan to add this originally. But after separating the logic out, this becomes more clear to me that we may need this. I haven't tested this yet (will do). But will also like to know if anyone using the `CELERY_KUBERNETES_EXECUTOR` already see this problem




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org