You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 08:02:44 UTC

[buildstream] 11/16: cascache: extract CASDProcess in new module

This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32_receive_signals
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 998942215c7a33227ead23bc182f379ef062c47a
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Oct 8 17:14:32 2019 +0100

    cascache: extract CASDProcess in new module
    
    Make it easier to specialize handling of the buildbox-casd process on
    Windows, by splitting it into it's own class. This allows us to
    encapsulate some decisions, and decreases the complexity of the CASCache
    class.
    
    Take some of the complexity out of this file by splitting the
    responsibility of managing the process out to another file.
---
 src/buildstream/_cas/cascache.py           | 149 +++----------------
 src/buildstream/_cas/casdprocessmanager.py | 220 +++++++++++++++++++++++++++++
 src/buildstream/_scheduler/scheduler.py    |  22 +--
 src/buildstream/_stream.py                 |   2 +-
 4 files changed, 245 insertions(+), 148 deletions(-)

diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 83b8e85..7c37d0e 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -25,10 +25,7 @@ import errno
 import contextlib
 import ctypes
 import multiprocessing
-import shutil
 import signal
-import subprocess
-import tempfile
 import time
 
 import grpc
@@ -40,8 +37,8 @@ from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc
 from .. import _signals, utils
 from ..types import FastEnum
 from .._exceptions import CASCacheError
-from .._message import Message, MessageType
 
+from .casdprocessmanager import CASDProcessManager
 from .casremote import _CASBatchRead, _CASBatchUpdate
 
 _BUFFER_SIZE = 65536
@@ -50,8 +47,6 @@ _BUFFER_SIZE = 65536
 # Refresh interval for disk usage of local cache in seconds
 _CACHE_USAGE_REFRESH = 5
 
-_CASD_MAX_LOGFILES = 10
-
 
 class CASLogLevel(FastEnum):
     WARNING = "warning"
@@ -80,35 +75,11 @@ class CASCache():
         os.makedirs(self.tmpdir, exist_ok=True)
 
         if casd:
-            # Place socket in global/user temporary directory to avoid hitting
-            # the socket path length limit.
-            self._casd_socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
-            self._casd_socket_path = os.path.join(self._casd_socket_tempdir, 'casd.sock')
-
-            casd_args = [utils.get_host_tool('buildbox-casd')]
-            casd_args.append('--bind=unix:' + self._casd_socket_path)
-            casd_args.append('--log-level=' + log_level.value)
-
-            if cache_quota is not None:
-                casd_args.append('--quota-high={}'.format(int(cache_quota)))
-                casd_args.append('--quota-low={}'.format(int(cache_quota / 2)))
-
-                if protect_session_blobs:
-                    casd_args.append('--protect-session-blobs')
-
-            casd_args.append(path)
-
-            self._casd_start_time = time.time()
-            self.casd_logfile = self._rotate_and_get_next_logfile()
-
-            with open(self.casd_logfile, "w") as logfile_fp:
-                # Block SIGINT on buildbox-casd, we don't need to stop it
-                # The frontend will take care of it if needed
-                with _signals.blocked([signal.SIGINT], ignore=False):
-                    self._casd_process = subprocess.Popen(
-                        casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT)
+            log_dir = os.path.join(self.casdir, "logs")
+            self._casd_process_manager = CASDProcessManager(
+                path, log_dir, log_level, cache_quota, protect_session_blobs)
         else:
-            self._casd_process = None
+            self._casd_process_manager = None
 
         self._casd_channel = None
         self._casd_cas = None
@@ -120,16 +91,16 @@ class CASCache():
 
         # Popen objects are not pickle-able, however, child processes only
         # need the information whether a casd subprocess was started or not.
-        assert '_casd_process' in state
-        state['_casd_process'] = bool(self._casd_process)
+        assert '_casd_process_manager' in state
+        state['_casd_process_manager'] = bool(self._casd_process_manager)
 
         return state
 
     def _init_casd(self):
-        assert self._casd_process, "CASCache was instantiated without buildbox-casd"
+        assert self._casd_process_manager, "CASCache was instantiated without buildbox-casd"
 
         if not self._casd_channel:
-            self._casd_channel = grpc.insecure_channel('unix:' + self._casd_socket_path)
+            self._casd_channel = grpc.insecure_channel('unix:' + self._casd_process_manager.socket_path)
             self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
             self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
 
@@ -143,7 +114,7 @@ class CASCache():
                     if e.code() == grpc.StatusCode.UNAVAILABLE:
                         # casd is not ready yet, try again after a 10ms delay,
                         # but don't wait for more than 15s
-                        if time.time() < self._casd_start_time + 15:
+                        if time.time() < self._casd_process_manager.start_time + 15:
                             time.sleep(1 / 100)
                             continue
 
@@ -204,10 +175,11 @@ class CASCache():
         if self._cache_usage_monitor:
             self._cache_usage_monitor.release_resources()
 
-        if self._casd_process:
+        if self._casd_process_manager:
             self.close_grpc_channels()
-            self._terminate_casd_process(messenger)
-            shutil.rmtree(self._casd_socket_tempdir)
+            self._casd_process_manager.terminate(messenger)
+            self._casd_process_manager.clean_up()
+            self._casd_process_manager = None
 
     # contains():
     #
@@ -684,30 +656,6 @@ class CASCache():
     #             Local Private Methods            #
     ################################################
 
-    # _rotate_and_get_next_logfile()
-    #
-    # Get the logfile to use for casd
-    #
-    # This will ensure that we don't create too many casd log files by
-    # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around.
-    #
-    # Returns:
-    #   (str): the path to the log file to use
-    #
-    def _rotate_and_get_next_logfile(self):
-        log_dir = os.path.join(self.casdir, "logs")
-
-        try:
-            existing_logs = sorted(os.listdir(log_dir))
-        except FileNotFoundError:
-            os.makedirs(log_dir)
-        else:
-            while len(existing_logs) >= _CASD_MAX_LOGFILES:
-                logfile_to_delete = existing_logs.pop(0)
-                os.remove(os.path.join(log_dir, logfile_to_delete))
-
-        return os.path.join(log_dir, str(self._casd_start_time) + ".log")
-
     def _refpath(self, ref):
         return os.path.join(self.casdir, 'refs', 'heads', ref)
 
@@ -976,67 +924,6 @@ class CASCache():
         # Upload any blobs missing on the server
         self.send_blobs(remote, missing_blobs)
 
-    # _terminate_casd_process()
-    #
-    # Terminate the buildbox casd process
-    #
-    # Args:
-    #   messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
-    #
-    def _terminate_casd_process(self, messenger=None):
-        return_code = self._casd_process.poll()
-
-        if return_code is not None:
-            # buildbox-casd is already dead
-            self._casd_process = None
-
-            if messenger:
-                messenger.message(
-                    Message(
-                        MessageType.BUG,
-                        "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
-                            return_code, self.casd_logfile
-                        ),
-                    )
-                )
-            return
-
-        self._casd_process.terminate()
-
-        try:
-            # Don't print anything if buildbox-casd terminates quickly
-            return_code = self._casd_process.wait(timeout=0.5)
-        except subprocess.TimeoutExpired:
-            if messenger:
-                cm = messenger.timed_activity("Terminating buildbox-casd")
-            else:
-                cm = contextlib.suppress()
-            with cm:
-                try:
-                    return_code = self._casd_process.wait(timeout=15)
-                except subprocess.TimeoutExpired:
-                    self._casd_process.kill()
-                    self._casd_process.wait(timeout=15)
-
-                    if messenger:
-                        messenger.message(
-                            Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
-                        )
-                    self._casd_process = None
-                    return
-
-        if return_code != 0 and messenger:
-            messenger.message(
-                Message(
-                    MessageType.BUG,
-                    "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(
-                        return_code, self.casd_logfile
-                    ),
-                )
-            )
-
-        self._casd_process = None
-
     # get_cache_usage():
     #
     # Fetches the current usage of the CAS local cache.
@@ -1050,16 +937,16 @@ class CASCache():
 
         return self._cache_usage_monitor.get_cache_usage()
 
-    # get_casd_process()
+    # get_casd_process_manager()
     #
     # Get the underlying buildbox-casd process
     #
     # Returns:
     #   (subprocess.Process): The casd process that is used for the current cascache
     #
-    def get_casd_process(self):
-        assert self._casd_process is not None, "This should only be called with a running buildbox-casd process"
-        return self._casd_process
+    def get_casd_process_manager(self):
+        assert self._casd_process_manager is not None, "Only call this with a running buildbox-casd process"
+        return self._casd_process_manager
 
 
 # _CASCacheUsage
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
new file mode 100644
index 0000000..1ae5e8e
--- /dev/null
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -0,0 +1,220 @@
+#
+#  Copyright (C) 2018 Codethink Limited
+#  Copyright (C) 2018-2019 Bloomberg Finance LP
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import asyncio
+import contextlib
+import os
+import shutil
+import signal
+import subprocess
+import tempfile
+import time
+
+from .. import _signals, utils
+from .._message import Message, MessageType
+
+_CASD_MAX_LOGFILES = 10
+
+
+# CASDProcessManager
+#
+# This manages the subprocess that runs buildbox-casd.
+#
+# Args:
+#     path (str): The root directory for the CAS repository
+#     log_dir (str): The directory for the logs
+#     log_level (LogLevel): Log level to give to buildbox-casd for logging
+#     cache_quota (int): User configured cache quota
+#     protect_session_blobs (bool): Disable expiry for blobs used in the current session
+#
+class CASDProcessManager:
+
+    def __init__(self, path, log_dir, log_level, cache_quota, protect_session_blobs):
+        self._log_dir = log_dir
+
+        # Place socket in global/user temporary directory to avoid hitting
+        # the socket path length limit.
+        self._socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
+        self.socket_path = os.path.join(self._socket_tempdir, 'casd.sock')
+
+        casd_args = [utils.get_host_tool('buildbox-casd')]
+        casd_args.append('--bind=unix:' + self.socket_path)
+        casd_args.append('--log-level=' + log_level.value)
+
+        if cache_quota is not None:
+            casd_args.append('--quota-high={}'.format(int(cache_quota)))
+            casd_args.append('--quota-low={}'.format(int(cache_quota / 2)))
+
+            if protect_session_blobs:
+                casd_args.append('--protect-session-blobs')
+
+        casd_args.append(path)
+
+        self.start_time = time.time()
+        self.logfile = self._rotate_and_get_next_logfile()
+
+        with open(self.logfile, "w") as logfile_fp:
+            # Block SIGINT on buildbox-casd, we don't need to stop it
+            # The frontend will take care of it if needed
+            with _signals.blocked([signal.SIGINT], ignore=False):
+                self._process = subprocess.Popen(
+                    casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT)
+
+        self._failure_callback = None
+        self._watcher = None
+
+    # _rotate_and_get_next_logfile()
+    #
+    # Get the logfile to use for casd
+    #
+    # This will ensure that we don't create too many casd log files by
+    # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around.
+    #
+    # Returns:
+    #   (str): the path to the log file to use
+    #
+    def _rotate_and_get_next_logfile(self):
+        try:
+            existing_logs = sorted(os.listdir(self._log_dir))
+        except FileNotFoundError:
+            os.makedirs(self._log_dir)
+        else:
+            while len(existing_logs) >= _CASD_MAX_LOGFILES:
+                logfile_to_delete = existing_logs.pop(0)
+                os.remove(os.path.join(self._log_dir, logfile_to_delete))
+
+        return os.path.join(self._log_dir, str(self.start_time) + ".log")
+
+    # terminate()
+    #
+    # Terminate the buildbox casd process
+    #
+    # Args:
+    #   messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
+    #
+    def terminate(self, messenger=None):
+        assert self._watcher is None
+        assert self._failure_callback is None
+
+        return_code = self._process.poll()
+
+        if return_code is not None:
+            # buildbox-casd is already dead
+            self._process = None
+
+            if messenger:
+                messenger.message(
+                    Message(
+                        MessageType.BUG,
+                        "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
+                            return_code, self.logfile
+                        ),
+                    )
+                )
+            return
+
+        self._process.terminate()
+
+        try:
+            # Don't print anything if buildbox-casd terminates quickly
+            return_code = self._process.wait(timeout=0.5)
+        except subprocess.TimeoutExpired:
+            if messenger:
+                cm = messenger.timed_activity("Terminating buildbox-casd")
+            else:
+                cm = contextlib.suppress()
+            with cm:
+                try:
+                    return_code = self._process.wait(timeout=15)
+                except subprocess.TimeoutExpired:
+                    self._process.kill()
+                    self._process.wait(timeout=15)
+
+                    if messenger:
+                        messenger.message(
+                            Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
+                        )
+                    self._process = None
+                    return
+
+        if return_code != 0 and messenger:
+            messenger.message(
+                Message(
+                    MessageType.BUG,
+                    "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(
+                        return_code, self.logfile
+                    ),
+                )
+            )
+
+    # clean_up()
+    #
+    # After termination, clean up any additional resources
+    #
+    def clean_up(self):
+        shutil.rmtree(self._socket_tempdir)
+
+    # set_failure_callback()
+    #
+    # Call this function if the CASD process stops unexpectedly.
+    #
+    # Note that we guarantee that the lifetime of any 'watcher' used is bound
+    # to the lifetime of the callback - we won't hang on to the asyncio loop
+    # longer than necessary.
+    #
+    # We won't be able to use watchers on win32, so we'll need to support
+    # another approach.
+    #
+    # Args:
+    #   func (callable): a callable that takes no parameters
+    #
+    def set_failure_callback(self, func):
+        assert func is not None
+        assert self._watcher is None
+        assert self._failure_callback is None, "We only support one callback for now"
+        self._failure_callback = func
+        self._watcher = asyncio.get_child_watcher()
+        self._watcher.add_child_handler(self._process.pid, self._on_casd_failure)
+
+    # clear_failure_callback()
+    #
+    # No longer call this callable if the CASD process stops unexpectedly
+    #
+    # Args:
+    #   func (callable): The callable that was provided to add_failure_callback().
+    #                    Supplying this again allows us to do error checking.
+    #
+    def clear_failure_callback(self, func):
+        assert func is not None
+        assert self._failure_callback == func, "We only support one callback for now"
+        self._watcher.remove_child_handler(self._process.pid)
+        self._failure_callback = None
+        self._watcher = None
+
+    # _on_casd_failure()
+    #
+    # Handler for casd process terminating unexpectedly
+    #
+    # Args:
+    #   pid (int): the process id under which buildbox-casd was running
+    #   returncode (int): the return code with which buildbox-casd exited
+    #
+    def _on_casd_failure(self, pid, returncode):
+        assert self._failure_callback is not None
+        self._process.returncode = returncode
+        self._failure_callback()
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 4c648d2..24086be 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -138,7 +138,6 @@ class Scheduler():
         self._suspendtime = None              # Session time compensation for suspended state
         self._queue_jobs = True               # Whether we should continue to queue jobs
         self._state = state
-        self._casd_process = None             # handle to the casd process for monitoring purpose
 
         # Bidirectional queue to send notifications back to the Scheduler's owner
         self._notification_queue = notification_queue
@@ -152,8 +151,8 @@ class Scheduler():
     #
     # Args:
     #    queues (list): A list of Queue objects
-    #    casd_processes (subprocess.Process): The subprocess which runs casd in order to be notified
-    #                                         of failures.
+    #    casd_process_manager (cascache.CASDProcessManager): The subprocess which runs casd, in order to be notified
+    #                                                        of failures.
     #
     # Returns:
     #    (SchedStatus): How the scheduling terminated
@@ -163,7 +162,7 @@ class Scheduler():
     # elements have been processed by each queue or when
     # an error arises
     #
-    def run(self, queues, casd_process):
+    def run(self, queues, casd_process_manager):
 
         # Hold on to the queues to process
         self.queues = queues
@@ -183,9 +182,7 @@ class Scheduler():
         self._connect_signals()
 
         # Watch casd while running to ensure it doesn't die
-        self._casd_process = casd_process
-        _watcher = asyncio.get_child_watcher()
-        _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
+        casd_process_manager.set_failure_callback(self._abort_on_casd_failure)
 
         # Start the profiler
         with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
@@ -195,8 +192,7 @@ class Scheduler():
             self.loop.close()
 
         # Stop watching casd
-        _watcher.remove_child_handler(casd_process.pid)
-        self._casd_process = None
+        casd_process_manager.clear_failure_callback(self._abort_on_casd_failure)
 
         # Stop handling unix signals
         self._disconnect_signals()
@@ -338,15 +334,9 @@ class Scheduler():
     # This will terminate immediately all jobs, since buildbox-casd is dead,
     # we can't do anything with them anymore.
     #
-    # Args:
-    #   pid (int): the process id under which buildbox-casd was running
-    #   returncode (int): the return code with which buildbox-casd exited
-    #
-    def _abort_on_casd_failure(self, pid, returncode):
+    def _abort_on_casd_failure(self):
         message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
         self._notify(Notification(NotificationType.MESSAGE, message=message))
-
-        self._casd_process.returncode = returncode
         self.terminate_jobs()
 
     # _start_job()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 6e4e5ca..500adb8 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1375,7 +1375,7 @@ class Stream():
         if self._session_start_callback is not None:
             self._session_start_callback()
 
-        status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
+        status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process_manager())
 
         if status == SchedStatus.ERROR:
             raise StreamError()