You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/01/11 21:25:47 UTC
[airflow] 06/27: Ensure correct log dir in file task handler (#28477)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 7246ee01d87290a6f59ae9b968d982214374e4d9
Author: Ping Zhang <pi...@umich.edu>
AuthorDate: Fri Jan 6 11:07:00 2023 -0800
Ensure correct log dir in file task handler (#28477)
since Path.mkdir combines with the process’ umask value to determine
the file mode and access flags, thus the newly created folder isn't
0o777
(cherry picked from commit bda39188bd127d0dd933cdff6c7e8d11ec6bf41b)
---
airflow/utils/log/file_task_handler.py | 37 +++++++++++++++++++++++++++-------
1 file changed, 30 insertions(+), 7 deletions(-)
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 1dcdd745ac..f8bf2043ab 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -328,6 +328,35 @@ class FileTaskHandler(logging.Handler):
return logs, metadata_array
+ def _prepare_log_folder(self, directory: Path):
+ """
+ Prepare the log folder and ensure its mode is 777.
+
+ To handle log writing when tasks are impersonated, the log files need to
+ be writable by the user that runs the Airflow command and the user
+ that is impersonated. This is mainly to handle corner cases with the
+ SubDagOperator. When the SubDagOperator is run, all of the operators
+ run under the impersonated user and create appropriate log files
+ as the impersonated user. However, if the user manually runs tasks
+ of the SubDagOperator through the UI, then the log files are created
+ by the user that runs the Airflow command. For example, the Airflow
+ run command may be run by the `airflow_sudoable` user, but the Airflow
+ tasks may be run by the `airflow` user. If the log files are not
+ writable by both users, then it's possible that re-running a task
+ via the UI (or vice versa) results in a permission error as the task
+ tries to write to a log file created by the other user.
+
+ Create the log file and give it group writable permissions
+ TODO(aoen): Make log dirs and logs globally readable for now since the SubDag
+ operator is not compatible with impersonation (e.g. if a Celery executor is used
+ for a SubDag operator and the SubDag operator has a different owner than the
+ parent DAG)
+ """
+ mode = 0o777
+ directory.mkdir(mode=mode, parents=True, exist_ok=True)
+ if directory.stat().st_mode != mode:
+ directory.chmod(mode)
+
def _init_file(self, ti):
"""
Create log directory and give it correct permissions.
@@ -350,13 +379,7 @@ class FileTaskHandler(logging.Handler):
# tries to write to a log file created by the other user.
relative_path = self._render_filename(ti, ti.try_number)
full_path = os.path.join(self.local_base, relative_path)
- directory = os.path.dirname(full_path)
- # Create the log file and give it group writable permissions
- # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag
- # operator is not compatible with impersonation (e.g. if a Celery executor is used
- # for a SubDag operator and the SubDag operator has a different owner than the
- # parent DAG)
- Path(directory).mkdir(mode=0o777, parents=True, exist_ok=True)
+ self._prepare_log_folder(Path(full_path).parent)
if not os.path.exists(full_path):
open(full_path, "a").close()