You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/01/11 21:25:47 UTC

[airflow] 06/27: Ensure correct log dir in file task handler (#28477)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7246ee01d87290a6f59ae9b968d982214374e4d9
Author: Ping Zhang <pi...@umich.edu>
AuthorDate: Fri Jan 6 11:07:00 2023 -0800

    Ensure correct log dir in file task handler (#28477)
    
    since Path.mkdir combines with the process’ umask value to determine
    the file mode and access flags, thus the newly created folder isn't
    0o777
    
    (cherry picked from commit bda39188bd127d0dd933cdff6c7e8d11ec6bf41b)
---
 airflow/utils/log/file_task_handler.py | 37 +++++++++++++++++++++++++++-------
 1 file changed, 30 insertions(+), 7 deletions(-)

diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 1dcdd745ac..f8bf2043ab 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -328,6 +328,35 @@ class FileTaskHandler(logging.Handler):
 
         return logs, metadata_array
 
+    def _prepare_log_folder(self, directory: Path):
+        """
+        Prepare the log folder and ensure its mode is 777.
+
+        To handle log writing when tasks are impersonated, the log files need to
+        be writable by the user that runs the Airflow command and the user
+        that is impersonated. This is mainly to handle corner cases with the
+        SubDagOperator. When the SubDagOperator is run, all of the operators
+        run under the impersonated user and create appropriate log files
+        as the impersonated user. However, if the user manually runs tasks
+        of the SubDagOperator through the UI, then the log files are created
+        by the user that runs the Airflow command. For example, the Airflow
+        run command may be run by the `airflow_sudoable` user, but the Airflow
+        tasks may be run by the `airflow` user. If the log files are not
+        writable by both users, then it's possible that re-running a task
+        via the UI (or vice versa) results in a permission error as the task
+        tries to write to a log file created by the other user.
+
+        Create the log file and give it group writable permissions
+        TODO(aoen): Make log dirs and logs globally readable for now since the SubDag
+        operator is not compatible with impersonation (e.g. if a Celery executor is used
+        for a SubDag operator and the SubDag operator has a different owner than the
+        parent DAG)
+        """
+        mode = 0o777
+        directory.mkdir(mode=mode, parents=True, exist_ok=True)
+        if directory.stat().st_mode != mode:
+            directory.chmod(mode)
+
     def _init_file(self, ti):
         """
         Create log directory and give it correct permissions.
@@ -350,13 +379,7 @@ class FileTaskHandler(logging.Handler):
         # tries to write to a log file created by the other user.
         relative_path = self._render_filename(ti, ti.try_number)
         full_path = os.path.join(self.local_base, relative_path)
-        directory = os.path.dirname(full_path)
-        # Create the log file and give it group writable permissions
-        # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag
-        # operator is not compatible with impersonation (e.g. if a Celery executor is used
-        # for a SubDag operator and the SubDag operator has a different owner than the
-        # parent DAG)
-        Path(directory).mkdir(mode=0o777, parents=True, exist_ok=True)
+        self._prepare_log_folder(Path(full_path).parent)
 
         if not os.path.exists(full_path):
             open(full_path, "a").close()