You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/12/17 23:53:01 UTC

[jira] [Commented] (AIRFLOW-1775) Remote file handler for logging

    [ https://issues.apache.org/jira/browse/AIRFLOW-1775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723490#comment-16723490 ] 

ASF GitHub Bot commented on AIRFLOW-1775:
-----------------------------------------

stale[bot] closed pull request #2757: [AIRFLOW-1775] Remote File Task Handler
URL: https://github.com/apache/incubator-airflow/pull/2757
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index e7d257f06a..5c09efaac9 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -1,199 +1,204 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import os
-import requests
-
-from jinja2 import Template
-
-from airflow import configuration as conf
-from airflow.configuration import AirflowConfigException
-from airflow.utils.file import mkdirs
-
-
-class FileTaskHandler(logging.Handler):
-    """
-    FileTaskHandler is a python log handler that handles and reads
-    task instance logs. It creates and delegates log handling
-    to `logging.FileHandler` after receiving task instance context.
-    It reads logs from task instance's host machine.
-    """
-
-    def __init__(self, base_log_folder, filename_template):
-        """
-        :param base_log_folder: Base log folder to place logs.
-        :param filename_template: template filename string
-        """
-        super(FileTaskHandler, self).__init__()
-        self.handler = None
-        self.local_base = base_log_folder
-        self.filename_template = filename_template
-        self.filename_jinja_template = None
-
-        if "{{" in self.filename_template: #jinja mode
-            self.filename_jinja_template = Template(self.filename_template)
-
-    def set_context(self, ti):
-        """
-        Provide task_instance context to airflow task handler.
-        :param ti: task instance object
-        """
-        local_loc = self._init_file(ti)
-        self.handler = logging.FileHandler(local_loc)
-        self.handler.setFormatter(self.formatter)
-        self.handler.setLevel(self.level)
-
-    def emit(self, record):
-        if self.handler is not None:
-            self.handler.emit(record)
-
-    def flush(self):
-        if self.handler is not None:
-            self.handler.flush()
-
-    def close(self):
-        if self.handler is not None:
-            self.handler.close()
-
-    def _render_filename(self, ti, try_number):
-        if self.filename_jinja_template:
-            jinja_context = ti.get_template_context()
-            jinja_context['try_number'] = try_number
-            return self.filename_jinja_template.render(**jinja_context)
-
-        return self.filename_template.format(dag_id=ti.dag_id,
-                                             task_id=ti.task_id,
-                                             execution_date=ti.execution_date.isoformat(),
-                                             try_number=try_number)
-
-    def _read(self, ti, try_number):
-        """
-        Template method that contains custom logic of reading
-        logs given the try_number.
-        :param ti: task instance record
-        :param try_number: current try_number to read log from
-        :return: log message as a string
-        """
-        # Task instance here might be different from task instance when
-        # initializing the handler. Thus explicitly getting log location
-        # is needed to get correct log path.
-        log_relative_path = self._render_filename(ti, try_number)
-        location = os.path.join(self.local_base, log_relative_path)
-
-        log = ""
-
-        if os.path.exists(location):
-            try:
-                with open(location) as f:
-                    log += "*** Reading local file: {}\n".format(location)
-                    log += "".join(f.readlines())
-            except Exception as e:
-                log = "*** Failed to load local log file: {}\n".format(location)
-                log += "*** {}\n".format(str(e))
-        else:
-            url = os.path.join(
-                "http://{ti.hostname}:{worker_log_server_port}/log", log_relative_path
-            ).format(
-                ti=ti,
-                worker_log_server_port=conf.get('celery', 'WORKER_LOG_SERVER_PORT')
-            )
-            log += "*** Log file does not exist: {}\n".format(location)
-            log += "*** Fetching from: {}\n".format(url)
-            try:
-                timeout = None  # No timeout
-                try:
-                    timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
-                except (AirflowConfigException, ValueError):
-                    pass
-
-                response = requests.get(url, timeout=timeout)
-
-                # Check if the resource was properly fetched
-                response.raise_for_status()
-
-                log += '\n' + response.text
-            except Exception as e:
-                log += "*** Failed to fetch log file from worker. {}\n".format(str(e))
-
-        return log
-
-    def read(self, task_instance, try_number=None):
-        """
-        Read logs of given task instance from local machine.
-        :param task_instance: task instance object
-        :param try_number: task instance try_number to read logs from. If None
-                           it returns all logs separated by try_number
-        :return: a list of logs
-        """
-        # Task instance increments its try number when it starts to run.
-        # So the log for a particular task try will only show up when
-        # try number gets incremented in DB, i.e logs produced the time
-        # after cli run and before try_number + 1 in DB will not be displayed.
-
-        if try_number is None:
-            next_try = task_instance.next_try_number
-            try_numbers = list(range(1, next_try))
-        elif try_number < 1:
-            logs = [
-                'Error fetching the logs. Try number {} is invalid.'.format(try_number),
-            ]
-            return logs
-        else:
-            try_numbers = [try_number]
-
-        logs = [''] * len(try_numbers)
-        for i, try_number in enumerate(try_numbers):
-            logs[i] += self._read(task_instance, try_number)
-
-        return logs
-
-    def _init_file(self, ti):
-        """
-        Create log directory and give it correct permissions.
-        :param ti: task instance object
-        :return relative log path of the given task instance
-        """
-        # To handle log writing when tasks are impersonated, the log files need to
-        # be writable by the user that runs the Airflow command and the user
-        # that is impersonated. This is mainly to handle corner cases with the
-        # SubDagOperator. When the SubDagOperator is run, all of the operators
-        # run under the impersonated user and create appropriate log files
-        # as the impersonated user. However, if the user manually runs tasks
-        # of the SubDagOperator through the UI, then the log files are created
-        # by the user that runs the Airflow command. For example, the Airflow
-        # run command may be run by the `airflow_sudoable` user, but the Airflow
-        # tasks may be run by the `airflow` user. If the log files are not
-        # writable by both users, then it's possible that re-running a task
-        # via the UI (or vice versa) results in a permission error as the task
-        # tries to write to a log file created by the other user.
-        relative_path = self._render_filename(ti, ti.try_number)
-        full_path = os.path.join(self.local_base, relative_path)
-        directory = os.path.dirname(full_path)
-        # Create the log file and give it group writable permissions
-        # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag
-        # operator is not compatible with impersonation (e.g. if a Celery executor is used
-        # for a SubDag operator and the SubDag operator has a different owner than the
-        # parent DAG)
-        if not os.path.exists(directory):
-            # Create the directory as globally writable using custom mkdirs
-            # as os.makedirs doesn't set mode properly.
-            mkdirs(directory, 0o775)
-
-        if not os.path.exists(full_path):
-            open(full_path, "a").close()
-            # TODO: Investigate using 444 instead of 666.
-            os.chmod(full_path, 0o666)
-
-        return full_path
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import requests
+
+from jinja2 import Template
+
+from airflow import configuration as conf
+from airflow.configuration import AirflowConfigException
+from airflow.utils.file import mkdirs
+
+
+class FileTaskHandler(logging.Handler):
+    """
+    FileTaskHandler is a python log handler that handles and reads
+    task instance logs. It creates and delegates log handling
+    to `logging.FileHandler` after receiving task instance context.
+    It reads logs from task instance's host machine.
+    """
+
+    def __init__(self, base_log_folder, filename_template):
+        """
+        :param base_log_folder: Base log folder to place logs.
+        :param filename_template: template filename string
+        """
+        super(FileTaskHandler, self).__init__()
+        self.handler = None
+        self.local_base = base_log_folder
+        self.log_relative_path = ''
+        self.filename_template = filename_template
+        self.filename_jinja_template = None
+
+        if "{{" in self.filename_template: #jinja mode
+            self.filename_jinja_template = Template(self.filename_template)
+
+    def set_context(self, ti):
+        """
+        Provide task_instance context to airflow task handler.
+        :param ti: task instance object
+        """
+        self.log_relative_path = self._render_filename(ti, ti.try_number)
+
+        local_loc = self._init_file()
+        self.handler = logging.FileHandler(local_loc)
+        self.handler.setFormatter(self.formatter)
+        self.handler.setLevel(self.level)
+
+    def emit(self, record):
+        if self.handler is not None:
+            self.handler.emit(record)
+
+    def flush(self):
+        if self.handler is not None:
+            self.handler.flush()
+
+    def close(self):
+        if self.handler is not None:
+            self.handler.close()
+
+    def _render_filename(self, ti, try_number):
+        if self.filename_jinja_template:
+            jinja_context = ti.get_template_context()
+            jinja_context['try_number'] = try_number
+            return self.filename_jinja_template.render(**jinja_context)
+
+        return self.filename_template.format(dag_id=ti.dag_id,
+                                             task_id=ti.task_id,
+                                             execution_date=ti.execution_date.isoformat(),
+                                             try_number=try_number)
+
+    def _read(self, ti, try_number):
+        """
+        Template method that contains custom logic of reading
+        logs given the try_number.
+        :param ti: task instance record
+        :param try_number: current try_number to read log from
+        :return: log message as a string
+        """
+        # Task instance here might be different from task instance when
+        # initializing the handler. Thus explicitly getting log location
+        # is needed to get correct log path.
+        log_relative_path = self._render_filename(ti, try_number)
+        location = os.path.join(self.local_base, log_relative_path)
+
+        log = ""
+
+        if os.path.exists(location):
+            try:
+                with open(location) as f:
+                    log += "*** Reading local file: {}\n".format(location)
+                    log += "".join(f.readlines())
+            except Exception as e:
+                log = "*** Failed to load local log file: {}\n".format(location)
+                log += "*** {}\n".format(str(e))
+        else:
+            url = os.path.join(
+                "http://{ti.hostname}:{worker_log_server_port}/log", log_relative_path
+            ).format(
+                ti=ti,
+                worker_log_server_port=conf.get('celery', 'WORKER_LOG_SERVER_PORT')
+            )
+            log += "*** Log file does not exist: {}\n".format(location)
+            log += "*** Fetching from: {}\n".format(url)
+            try:
+                timeout = None  # No timeout
+                try:
+                    timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
+                except (AirflowConfigException, ValueError):
+                    pass
+
+                response = requests.get(url, timeout=timeout)
+
+                # Check if the resource was properly fetched
+                response.raise_for_status()
+
+                log += '\n' + response.text
+            except Exception as e:
+                log += "*** Failed to fetch log file from worker. {}\n".format(str(e))
+
+        return log
+
+    def read(self, task_instance, try_number=None):
+        """
+        Read logs of given task instance from local machine.
+        :param task_instance: task instance object
+        :param try_number: task instance try_number to read logs from. If None
+                           it returns all logs separated by try_number
+        :return: a list of logs
+        """
+        # Task instance increments its try number when it starts to run.
+        # So the log for a particular task try will only show up when
+        # try number gets incremented in DB, i.e logs produced the time
+        # after cli run and before try_number + 1 in DB will not be displayed.
+
+        if try_number is None:
+            next_try = task_instance.next_try_number
+            try_numbers = list(range(1, next_try))
+        elif try_number < 1:
+            logs = [
+                'Error fetching the logs. Try number {} is invalid.'.format(try_number),
+            ]
+            return logs
+        else:
+            try_numbers = [try_number]
+
+        logs = [''] * len(try_numbers)
+        for i, try_number in enumerate(try_numbers):
+            logs[i] += self._read(task_instance, try_number)
+
+        return logs
+
+    def _init_file(self):
+        """
+        Create log directory and give it correct permissions.
+        :return absolute log path of the given task instance
+        """
+        return self._init_file_path(os.path.join(self.local_base,
+                                                 self.log_relative_path))
+
+    def _init_file_path(self, full_path):
+        # To handle log writing when tasks are impersonated, the log files need to
+        # be writable by the user that runs the Airflow command and the user
+        # that is impersonated. This is mainly to handle corner cases with the
+        # SubDagOperator. When the SubDagOperator is run, all of the operators
+        # run under the impersonated user and create appropriate log files
+        # as the impersonated user. However, if the user manually runs tasks
+        # of the SubDagOperator through the UI, then the log files are created
+        # by the user that runs the Airflow command. For example, the Airflow
+        # run command may be run by the `airflow_sudoable` user, but the Airflow
+        # tasks may be run by the `airflow` user. If the log files are not
+        # writable by both users, then it's possible that re-running a task
+        # via the UI (or vice versa) results in a permission error as the task
+        # tries to write to a log file created by the other user.
+
+        directory = os.path.dirname(full_path)
+        # Create the log file and give it group writable permissions
+        # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag
+        # operator is not compatible with impersonation (e.g. if a Celery executor is used
+        # for a SubDag operator and the SubDag operator has a different owner than the
+        # parent DAG)
+        if not os.path.exists(directory):
+            # Create the directory as globally writable using custom mkdirs
+            # as os.makedirs doesn't set mode properly.
+            mkdirs(directory, 0o775)
+
+        if not os.path.exists(full_path):
+            open(full_path, "a").close()
+            # TODO: Investigate using 444 instead of 666.
+            os.chmod(full_path, 0o666)
+
+        return full_path
diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py
index 3b83c8cd1f..08bbbff24e 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -29,7 +29,6 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
     def __init__(self, base_log_folder, gcs_log_folder, filename_template):
         super(GCSTaskHandler, self).__init__(base_log_folder, filename_template)
         self.remote_base = gcs_log_folder
-        self.log_relative_path = ''
         self._hook = None
         self.closed = False
         self.upload_on_close = True
@@ -59,7 +58,6 @@ def set_context(self, ti):
         # Log relative path is used to construct local and remote
         # log path to upload log files into GCS and read from the
         # remote location.
-        self.log_relative_path = self._render_filename(ti, ti.try_number)
         self.upload_on_close = not ti.is_raw
 
     def close(self):
diff --git a/airflow/utils/log/remote_file_task_handler.py b/airflow/utils/log/remote_file_task_handler.py
new file mode 100644
index 0000000000..d9368f4667
--- /dev/null
+++ b/airflow/utils/log/remote_file_task_handler.py
@@ -0,0 +1,84 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import shutil
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.log.file_task_handler import FileTaskHandler
+
+
+class RemoteFileTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    RemoteFileTaskHandler is a python log handler that handles and reads
+    task instance logs. It provides the same functionality as the
+    FileTaskHandler, but copies the log file to a second log folder on close.
+    This can be usefull for archiving purposes, or when dealing with
+    mounted cloud storage which is paid per action.
+    """
+    def __init__(self, base_log_folder, secondary_log_folder, filename_template):
+        super(RemoteFileTaskHandler, self).__init__(base_log_folder, filename_template)
+        self.secondary_log_folder = secondary_log_folder
+        self.closed = False
+
+    def close(self):
+        """
+        Close and copy the local log file to the secondary location
+        """
+        # When application exit, system shuts down all handlers by
+        # calling close method. Here we check if logger is already
+        # closed to prevent uploading the log to remote storage multiple
+        # times when `logging.shutdown` is called.
+        if self.closed:
+            return
+
+        super(RemoteFileTaskHandler, self).close()
+
+        primary_loc = os.path.join(self.local_base, self.log_relative_path)
+        secondary_loc = os.path.join(self.secondary_log_folder, self.log_relative_path)
+        if os.path.exists(primary_loc):
+            shutil.copyfile(primary_loc, secondary_loc)
+
+        # Mark closed so we don't double write if close is called twice
+        self.closed = True
+
+    def _init_file(self):
+        self._init_file_path(os.path.join(self.secondary_log_folder,
+                                          self.log_relative_path))
+
+        return super(RemoteFileTaskHandler, self)._init_file()
+
+    def _read(self, ti, try_number):
+        """
+        Read logs of given task instance and try_number from the secondary location.
+        If that does not exist, fallback to the default FileTaskHandler.
+        :param ti: task instance object
+        :param try_number: task instance try_number to read logs from
+        """
+        # Explicitly getting log relative path is necessary as the given
+        # task instance might be different than task instance passed in
+        # in set_context method.
+        log_relative_path = self._render_filename(ti, try_number + 1)
+        secondary_loc = os.path.join(self.secondary_log_folder, log_relative_path)
+
+        if os.path.exists(secondary_loc):
+            try:
+                with open(secondary_loc) as f:
+                    log = "*** Reading secondary log.\n" + "".join(f.readlines())
+            except Exception as e:
+                log = "*** Failed to load secondary log file: {}. {}\n" \
+                    .format(secondary_loc, str(e))
+        else:
+            log = super(RemoteFileTaskHandler, self)._read(ti, try_number)
+
+        return log
diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py
index b3acf3a62e..c32ff2faa8 100644
--- a/airflow/utils/log/s3_task_handler.py
+++ b/airflow/utils/log/s3_task_handler.py
@@ -54,7 +54,6 @@ def set_context(self, ti):
         super(S3TaskHandler, self).set_context(ti)
         # Local location and remote location is needed to open and
         # upload local log file to S3 remote storage.
-        self.log_relative_path = self._render_filename(ti, ti.try_number)
         self.upload_on_close = not ti.is_raw
 
     def close(self):
diff --git a/tests/utils/log/test_remote_file_task_handler.py b/tests/utils/log/test_remote_file_task_handler.py
new file mode 100644
index 0000000000..0b552886de
--- /dev/null
+++ b/tests/utils/log/test_remote_file_task_handler.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import shutil
+import os
+import unittest
+
+from airflow.utils.log.remote_file_task_handler import RemoteFileTaskHandler
+from datetime import datetime
+from datetime import timedelta
+
+
+class TestRemoteFileTaskHandler(unittest.TestCase):
+    def setUp(self):
+        super(TestRemoteFileTaskHandler, self).setUp()
+        self.base_log_folder = "/tmp/log_test"
+        self.remote_base_log_folder = "/tmp/log_test2"
+        self.filename_template = "output.log"
+
+    def test_remote_close(self):
+        rfth = RemoteFileTaskHandler(self.base_log_folder,
+                                     self.remote_base_log_folder,
+                                     self.filename_template)
+        rfth.log_relative_path = self.filename_template
+        full_path = rfth._init_file()
+
+        with open(full_path, 'w') as fp:
+            fp.write('testing')
+
+        rfth.close()
+
+        secondary_path = os.path.join(self.remote_base_log_folder, self.filename_template)
+        self.assertTrue(os.path.exists(secondary_path))
+
+        with open(secondary_path) as fp:
+            line = fp.read()
+            self.assertEqual(line, 'testing')
+
+    def tearDown(self):
+        shutil.rmtree(self.base_log_folder, ignore_errors=True)
+        shutil.rmtree(self.remote_base_log_folder, ignore_errors=True)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Remote file handler for logging
> -------------------------------
>
>                 Key: AIRFLOW-1775
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1775
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: logging
>            Reporter: Niels Zeilemaker
>            Priority: Major
>
> We're using a mounted Azure file share to store our log files. Currently, Airflow is writing it's logs to that fileshare. However, this is making the solution quite expensive, as you pay per action on Azure file shares.
> If we would have a remote_file_task_logging handler, we could mimic the s3_task_logging, and copy the file to the fileshare in the close method. Reducing the cost, while still providing persistent storage.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)