You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2023/01/02 18:28:42 UTC

[GitHub] [airflow] MaicoTimmerman commented on a diff in pull request #28626: Introduce logging integration for Apache WebHDFS

MaicoTimmerman commented on code in PR #28626:
URL: https://github.com/apache/airflow/pull/28626#discussion_r1060149945


##########
airflow/config_templates/airflow_local_settings.py:
##########
@@ -286,6 +287,17 @@
             },
         }
         DEFAULT_LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS)
+    elif REMOTE_BASE_LOG_FOLDER.startswith("webhdfs://"):
+        WEBHDFS_REMOTE_HANDLERS = {
+            "task": {
+                "class": "airflow.providers.apache.hdfs.log.webhdfs_task_handler.WebHDFSTaskHandler",
+                "formatter": "airflow",
+                "base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
+                "webhdfs_log_folder": REMOTE_BASE_LOG_FOLDER,
+                "filename_template": FILENAME_TEMPLATE,
+            }
+        }
+        DEFAULT_LOGGING_CONFIG["handlers"].update(WEBHDFS_REMOTE_HANDLERS)

Review Comment:
   I agree with you, the current solution scales very poorly.
   
   Before I try and implement a improvement, I'm not sure what direction we should take on this. I could create a class/function that does the same switch logic and centralizes the common parameters, `base_log_folder`, `REMOTE_BASE_LOG_FOLDER`, `FILENAME_TEMPLATE` and `formatter`. Is that what you mean?
   
   Adding to that, I feel the issue here is more regarding the non-uniformity of the remote loggers, as I described in my other comment. If we are looking for a better uniform solution, I propose we discuss the remote logging architecture in an issue, as it would be a huge scope creep for this MR.



##########
airflow/providers/apache/hdfs/hooks/webhdfs.py:
##########
@@ -153,3 +159,63 @@ def load_file(
             hdfs_path=destination, local_path=source, overwrite=overwrite, n_threads=parallelism, **kwargs
         )
         self.log.debug("Uploaded file %s to %s", source, destination)
+
+    @staticmethod
+    def parse_webhdfs_url(webhdfs_url: str) -> str:
+        """
+        Parses a WebHDFS url into a path.
+
+        :param webhdfs_url: The WebHDFS Url to parse.
+        :return: the parsed path
+        """
+        parsed_url = urlsplit(webhdfs_url)
+
+        if not parsed_url.path:
+            raise AirflowException("Please provide a non-empty path")
+
+        if parsed_url.netloc != "" or not parsed_url.path.startswith("/"):
+            raise AirflowException(
+                f'Please provide an absolute path instead of "{parsed_url.netloc}{parsed_url.path}"'
+            )
+
+        return parsed_url.path
+
+    def read_file(self, hdfs_path: str) -> str:
+        """
+        Check for the existence of a path in HDFS by querying FileStatus.
+
+        :param hdfs_path: The path to check.
+        :return: True if the path exists and False if not.
+        """
+        conn = self.get_conn()
+        with conn.read(hdfs_path) as reader:
+            return reader.read().decode()
+
+    def write_file(self, log: str, hdfs_path: str, write_mode: str = "wb", overwrite: bool = True) -> None:
+        if write_mode in ("wb", "ab"):
+            if "w" in write_mode and self.check_for_path(hdfs_path) and not overwrite:
+                raise FileExistsError(f"File {hdfs_path} already exists.")
+            elif "a" in write_mode and not self.check_for_path(hdfs_path):
+                raise FileNotFoundError(f"File {hdfs_path} not found.")
+        else:
+            raise ValueError(f"Supported modes are only `wb` and `ab` but was '{write_mode}'.")
+
+        conn = self.get_conn()
+        if write_mode == "wb":
+            conn.write(hdfs_path, log.encode(), overwrite=overwrite)
+        elif write_mode == "ab":
+            conn.write(hdfs_path, log.encode(), overwrite=overwrite, append=True)

Review Comment:
   Dito.



##########
airflow/providers/apache/hdfs/log/webhdfs_task_handler.py:
##########
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+import pathlib
+from typing import Any
+
+from airflow.compat.functools import cached_property
+from airflow.configuration import conf
+from airflow.models import TaskInstance
+from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.state import State
+
+
+class WebHDFSTaskHandler(FileTaskHandler, LoggingMixin):
+    """A TaskHandler that uploads logs to HDFS once done. During the run, it will show the executor logs."""
+
+    def __init__(self, base_log_folder: str, webhdfs_log_folder: str, filename_template: str | None = None):
+        super().__init__(base_log_folder=base_log_folder, filename_template=filename_template)
+        # Overwrite the location to write task logs to.
+        self.remote_base = WebHDFSHook.parse_webhdfs_url(webhdfs_log_folder)
+        self.log_relative_path = ""
+        self.upload_on_close = False
+        self.closed = False
+
+    @cached_property
+    def hook(self) -> WebHDFSHook:
+        """Returns WebHDFSHook."""
+        return WebHDFSHook(webhdfs_conn_id=conf.get("logging", "REMOTE_LOG_CONN_ID"))
+
+    def set_context(self, ti: TaskInstance) -> None:
+        """Provide task_instance context to airflow task handler."""
+        super().set_context(ti)
+        self.log_relative_path = self._render_filename(ti, ti.try_number)
+        self.upload_on_close = not ti.raw
+
+        # Clear the file first so that duplicate data is not uploaded
+        # when re-using the same path (e.g. with rescheduled sensors)
+        if self.upload_on_close and self.handler is not None:
+            with open(self.handler.baseFilename, "w"):
+                pass
+
+    def close(self) -> None:
+        """Close and upload local log file to remote storage GCS."""
+        # When application exit, system shuts down all handlers by
+        # calling close method. Here we check if logger is already
+        # closed to prevent uploading the log to remote storage multiple
+        # times when `logging.shutdown` is called.
+        if self.closed:
+            return
+
+        super().close()
+
+        if not self.upload_on_close:
+            return
+
+        local_loc = os.path.join(self.local_base, self.log_relative_path)
+        remote_loc = os.path.join(self.remote_base, self.log_relative_path)
+        # ":" is not a valid character in HDFS paths.
+        remote_loc = remote_loc.replace(":", "-")
+        if os.path.exists(local_loc):
+            log = pathlib.Path(local_loc).read_text()
+            self.hdfs_write(log, remote_loc)
+
+        # Mark closed, so we don't double write if close is called twice
+        self.closed = True
+
+    def _read(
+        self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | None = None
+    ) -> tuple[str, dict[str, Any]]:
+        """
+        Template method that contains custom logic of reading logs given the try_number.
+
+        :param ti: task instance record
+        :param try_number: current try_number to read log from
+        :param metadata: log metadata, can be used for steaming log reading and auto-tailing.
+        :return: log message as a string and metadata.
+        """
+        log_relative_path = self._render_filename(ti, try_number)
+        remote_loc = os.path.join(self.remote_base, log_relative_path)
+        # ":" is not a valid character in HDFS paths.
+        remote_loc = remote_loc.replace(":", "-")
+
+        log = ""
+        checking_running_task = ti.try_number == try_number and ti.state == State.RUNNING
+        if not checking_running_task and self.hook.check_for_path(remote_loc):
+            try:
+                log += f"*** Reading remote log from HDFS: {remote_loc}.\n"
+                log += f"{self.hook.read_file(remote_loc)}\n"
+            except Exception as e:
+                log += f"*** Failed to load HDFS log file: {remote_loc}.\n"
+                log += f"*** {str(e)}\n"
+        else:
+            if checking_running_task:
+                log += "*** Task is still running. Getting logs directly from executors.\n"
+            log_of_executors, _ = super()._read(ti=ti, try_number=try_number, metadata=metadata)
+            log += log_of_executors
+        return log, {"end_of_log": True}
+
+    def hdfs_write(self, log: str, hdfs_path: str, max_retry: int = 1) -> None:
+        directory = os.path.dirname(hdfs_path)
+
+        # Default to a single retry attempt because s3 upload failures are
+        # rare but occasionally occur.  Multiple retry attempts are unlikely
+        # to help as they usually indicate non-empheral errors.
+        for try_num in range(1 + max_retry):
+            try:
+                if not self.hook.check_for_path(directory):
+                    self.hook.make_directory(directory)
+
+                if self.hook.is_file(hdfs_path):
+                    self.hook.write_file(log, hdfs_path, write_mode="ab", overwrite=False)
+                else:
+                    self.hook.write_file(log, hdfs_path, write_mode="wb")
+                break

Review Comment:
   In this case I'd argue against breaking this out into a helper function, simply to keep parity between the different RemoteTaskHandlers (S3, CloudWatch, GCP, etc.). 
   
   All the different remote logging solution's have been written independently, so they all have their own micro-optimizations. I felt retrying was worth taking from S3, but ultimately I'd say we should perhaps try and unite all non-provider specific logic (e.g. `upload_on_close`, `checking_running_task`, etc.) in the single `ExternalLoggingMixin`, which is currently only used by Elastic, which could be a good location to unify this. 
   
   My proposal would be to open a new issue in which the expansion of the interface of this mixin can be discussed.



##########
airflow/providers/apache/hdfs/hooks/webhdfs.py:
##########
@@ -153,3 +159,63 @@ def load_file(
             hdfs_path=destination, local_path=source, overwrite=overwrite, n_threads=parallelism, **kwargs
         )
         self.log.debug("Uploaded file %s to %s", source, destination)
+
+    @staticmethod
+    def parse_webhdfs_url(webhdfs_url: str) -> str:
+        """
+        Parses a WebHDFS url into a path.
+
+        :param webhdfs_url: The WebHDFS Url to parse.
+        :return: the parsed path
+        """
+        parsed_url = urlsplit(webhdfs_url)
+
+        if not parsed_url.path:
+            raise AirflowException("Please provide a non-empty path")
+
+        if parsed_url.netloc != "" or not parsed_url.path.startswith("/"):
+            raise AirflowException(
+                f'Please provide an absolute path instead of "{parsed_url.netloc}{parsed_url.path}"'
+            )
+
+        return parsed_url.path
+
+    def read_file(self, hdfs_path: str) -> str:
+        """
+        Check for the existence of a path in HDFS by querying FileStatus.
+
+        :param hdfs_path: The path to check.
+        :return: True if the path exists and False if not.
+        """
+        conn = self.get_conn()
+        with conn.read(hdfs_path) as reader:
+            return reader.read().decode()
+
+    def write_file(self, log: str, hdfs_path: str, write_mode: str = "wb", overwrite: bool = True) -> None:
+        if write_mode in ("wb", "ab"):
+            if "w" in write_mode and self.check_for_path(hdfs_path) and not overwrite:
+                raise FileExistsError(f"File {hdfs_path} already exists.")
+            elif "a" in write_mode and not self.check_for_path(hdfs_path):
+                raise FileNotFoundError(f"File {hdfs_path} not found.")

Review Comment:
   Instead of helper functions, I decided to split this function in both `append_file` and `write_file` and removed the `write_mode`-parameter. I feel this makes for a better interface of the hook and simplifies the function calls in the task logger.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org