You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:18:58 UTC

[buildstream] 02/06: cascache: extract CASDProcess in new module

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch aevri/casdprocessmanager2
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit caff94097cbaa3be3463e19c6ba2e6276d86db4e
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Oct 8 17:14:32 2019 +0100

    cascache: extract CASDProcess in new module
    
    Make it easier to specialize handling of the buildbox-casd process on
    Windows, by splitting it into it's own class. This allows us to
    encapsulate some decisions, and decreases the complexity of the CASCache
    class.
    
    Take some of the complexity out of this file by splitting the
    responsibility of managing the process out to another file.
---
 src/buildstream/_cas/cascache.py           | 168 +++++-----------------
 src/buildstream/_cas/casdprocessmanager.py | 220 +++++++++++++++++++++++++++++
 src/buildstream/_scheduler/scheduler.py    |  22 +--
 src/buildstream/_stream.py                 |   2 +-
 4 files changed, 262 insertions(+), 150 deletions(-)

diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 4ee575d..091b14e 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -25,10 +25,7 @@ import errno
 import contextlib
 import ctypes
 import multiprocessing
-import shutil
 import signal
-import subprocess
-import tempfile
 import time
 
 import grpc
@@ -40,8 +37,8 @@ from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc
 from .. import _signals, utils
 from ..types import FastEnum
 from .._exceptions import CASCacheError
-from .._message import Message, MessageType
 
+from .casdprocessmanager import CASDProcessManager
 from .casremote import _CASBatchRead, _CASBatchUpdate
 
 _BUFFER_SIZE = 65536
@@ -50,8 +47,6 @@ _BUFFER_SIZE = 65536
 # Refresh interval for disk usage of local cache in seconds
 _CACHE_USAGE_REFRESH = 5
 
-_CASD_MAX_LOGFILES = 10
-
 
 class CASLogLevel(FastEnum):
     WARNING = "warning"
@@ -86,45 +81,21 @@ class CASCache():
         self._cache_usage_monitor_forbidden = False
 
         if casd:
-            # Place socket in global/user temporary directory to avoid hitting
-            # the socket path length limit.
-            self._casd_socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
-            self._casd_socket_path = os.path.join(self._casd_socket_tempdir, 'casd.sock')
-
-            casd_args = [utils.get_host_tool('buildbox-casd')]
-            casd_args.append('--bind=unix:' + self._casd_socket_path)
-            casd_args.append('--log-level=' + log_level.value)
-
-            if cache_quota is not None:
-                casd_args.append('--quota-high={}'.format(int(cache_quota)))
-                casd_args.append('--quota-low={}'.format(int(cache_quota / 2)))
-
-                if protect_session_blobs:
-                    casd_args.append('--protect-session-blobs')
-
-            casd_args.append(path)
-
-            self._casd_start_time = time.time()
-            self.casd_logfile = self._rotate_and_get_next_logfile()
-
-            with open(self.casd_logfile, "w") as logfile_fp:
-                # Block SIGINT on buildbox-casd, we don't need to stop it
-                # The frontend will take care of it if needed
-                with _signals.blocked([signal.SIGINT], ignore=False):
-                    self._casd_process = subprocess.Popen(
-                        casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT)
+            log_dir = os.path.join(self.casdir, "logs")
+            self._casd_process_manager = CASDProcessManager(
+                path, log_dir, log_level, cache_quota, protect_session_blobs)
 
             self._cache_usage_monitor = _CASCacheUsageMonitor(self)
         else:
-            self._casd_process = None
+            self._casd_process_manager = None
 
     def __getstate__(self):
         state = self.__dict__.copy()
 
         # Popen objects are not pickle-able, however, child processes only
-        # need the information whether a casd subprocess was started or not.
-        assert '_casd_process' in state
-        state['_casd_process'] = bool(self._casd_process)
+        # need some information from the manager, so we can use a proxy.
+        if state['_casd_process_manager'] is not None:
+            state['_casd_process_manager'] = _LimitedCASDProcessManagerProxy(self._casd_process_manager)
 
         # The usage monitor is not pickle-able, but we also don't need it in
         # child processes currently. Make sure that if this changes, we get a
@@ -137,18 +108,18 @@ class CASCache():
         return state
 
     def _init_casd(self):
-        assert self._casd_process, "CASCache was instantiated without buildbox-casd"
+        assert self._casd_process_manager, "CASCache was instantiated without buildbox-casd"
 
         if not self._casd_channel:
-            while not os.path.exists(self._casd_socket_path):
+            while not os.path.exists(self._casd_process_manager.socket_path):
                 # casd is not ready yet, try again after a 10ms delay,
                 # but don't wait for more than 15s
-                if time.time() > self._casd_start_time + 15:
+                if time.time() > self._casd_process_manager.start_time + 15:
                     raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
 
                 time.sleep(0.01)
 
-            self._casd_channel = grpc.insecure_channel('unix:' + self._casd_socket_path)
+            self._casd_channel = grpc.insecure_channel('unix:' + self._casd_process_manager.socket_path)
             self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
             self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
 
@@ -211,10 +182,11 @@ class CASCache():
         if self._cache_usage_monitor:
             self._cache_usage_monitor.release_resources()
 
-        if self._casd_process:
+        if self._casd_process_manager:
             self.close_grpc_channels()
-            self._terminate_casd_process(messenger)
-            shutil.rmtree(self._casd_socket_tempdir)
+            self._casd_process_manager.terminate(messenger)
+            self._casd_process_manager.clean_up()
+            self._casd_process_manager = None
 
     # contains():
     #
@@ -691,30 +663,6 @@ class CASCache():
     #             Local Private Methods            #
     ################################################
 
-    # _rotate_and_get_next_logfile()
-    #
-    # Get the logfile to use for casd
-    #
-    # This will ensure that we don't create too many casd log files by
-    # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around.
-    #
-    # Returns:
-    #   (str): the path to the log file to use
-    #
-    def _rotate_and_get_next_logfile(self):
-        log_dir = os.path.join(self.casdir, "logs")
-
-        try:
-            existing_logs = sorted(os.listdir(log_dir))
-        except FileNotFoundError:
-            os.makedirs(log_dir)
-        else:
-            while len(existing_logs) >= _CASD_MAX_LOGFILES:
-                logfile_to_delete = existing_logs.pop(0)
-                os.remove(os.path.join(log_dir, logfile_to_delete))
-
-        return os.path.join(log_dir, str(self._casd_start_time) + ".log")
-
     def _refpath(self, ref):
         return os.path.join(self.casdir, 'refs', 'heads', ref)
 
@@ -983,67 +931,6 @@ class CASCache():
         # Upload any blobs missing on the server
         self.send_blobs(remote, missing_blobs)
 
-    # _terminate_casd_process()
-    #
-    # Terminate the buildbox casd process
-    #
-    # Args:
-    #   messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
-    #
-    def _terminate_casd_process(self, messenger=None):
-        return_code = self._casd_process.poll()
-
-        if return_code is not None:
-            # buildbox-casd is already dead
-            self._casd_process = None
-
-            if messenger:
-                messenger.message(
-                    Message(
-                        MessageType.BUG,
-                        "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
-                            return_code, self.casd_logfile
-                        ),
-                    )
-                )
-            return
-
-        self._casd_process.terminate()
-
-        try:
-            # Don't print anything if buildbox-casd terminates quickly
-            return_code = self._casd_process.wait(timeout=0.5)
-        except subprocess.TimeoutExpired:
-            if messenger:
-                cm = messenger.timed_activity("Terminating buildbox-casd")
-            else:
-                cm = contextlib.suppress()
-            with cm:
-                try:
-                    return_code = self._casd_process.wait(timeout=15)
-                except subprocess.TimeoutExpired:
-                    self._casd_process.kill()
-                    self._casd_process.wait(timeout=15)
-
-                    if messenger:
-                        messenger.message(
-                            Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
-                        )
-                    self._casd_process = None
-                    return
-
-        if return_code != 0 and messenger:
-            messenger.message(
-                Message(
-                    MessageType.BUG,
-                    "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(
-                        return_code, self.casd_logfile
-                    ),
-                )
-            )
-
-        self._casd_process = None
-
     # get_cache_usage():
     #
     # Fetches the current usage of the CAS local cache.
@@ -1055,16 +942,16 @@ class CASCache():
         assert not self._cache_usage_monitor_forbidden
         return self._cache_usage_monitor.get_cache_usage()
 
-    # get_casd_process()
+    # get_casd_process_manager()
     #
     # Get the underlying buildbox-casd process
     #
     # Returns:
     #   (subprocess.Process): The casd process that is used for the current cascache
     #
-    def get_casd_process(self):
-        assert self._casd_process is not None, "This should only be called with a running buildbox-casd process"
-        return self._casd_process
+    def get_casd_process_manager(self):
+        assert self._casd_process_manager is not None, "Only call this with a running buildbox-casd process"
+        return self._casd_process_manager
 
 
 # _CASCacheUsage
@@ -1172,3 +1059,18 @@ def _grouper(iterable, n):
         except StopIteration:
             return
         yield itertools.chain([current], itertools.islice(iterable, n - 1))
+
+
+# _LimitedCASDProcessManagerProxy
+#
+# This can stand-in for an owning CASDProcessManager, for some functions. This
+# is useful when pickling objects that contain a CASDProcessManager - as long
+# as the lifetime of the original exceeds this proxy.
+#
+# Args:
+#     casd_process_manager (CASDProcessManager): The manager to proxy
+#
+class _LimitedCASDProcessManagerProxy:
+    def __init__(self, casd_process_manager):
+        self.socket_path = casd_process_manager.socket_path
+        self.start_time = casd_process_manager.start_time
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
new file mode 100644
index 0000000..697b21f
--- /dev/null
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -0,0 +1,220 @@
+#
+#  Copyright (C) 2018 Codethink Limited
+#  Copyright (C) 2018-2019 Bloomberg Finance LP
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import asyncio
+import contextlib
+import os
+import shutil
+import signal
+import subprocess
+import tempfile
+import time
+
+from .. import _signals, utils
+from .._message import Message, MessageType
+
+_CASD_MAX_LOGFILES = 10
+
+
+# CASDProcessManager
+#
+# This manages the subprocess that runs buildbox-casd.
+#
+# Args:
+#     path (str): The root directory for the CAS repository
+#     log_dir (str): The directory for the logs
+#     log_level (LogLevel): Log level to give to buildbox-casd for logging
+#     cache_quota (int): User configured cache quota
+#     protect_session_blobs (bool): Disable expiry for blobs used in the current session
+#
+class CASDProcessManager:
+
+    def __init__(self, path, log_dir, log_level, cache_quota, protect_session_blobs):
+        self._log_dir = log_dir
+
+        # Place socket in global/user temporary directory to avoid hitting
+        # the socket path length limit.
+        self._socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
+        self.socket_path = os.path.join(self._socket_tempdir, 'casd.sock')
+
+        casd_args = [utils.get_host_tool('buildbox-casd')]
+        casd_args.append('--bind=unix:' + self.socket_path)
+        casd_args.append('--log-level=' + log_level.value)
+
+        if cache_quota is not None:
+            casd_args.append('--quota-high={}'.format(int(cache_quota)))
+            casd_args.append('--quota-low={}'.format(int(cache_quota / 2)))
+
+            if protect_session_blobs:
+                casd_args.append('--protect-session-blobs')
+
+        casd_args.append(path)
+
+        self.start_time = time.time()
+        self._logfile = self._rotate_and_get_next_logfile()
+
+        with open(self._logfile, "w") as logfile_fp:
+            # Block SIGINT on buildbox-casd, we don't need to stop it
+            # The frontend will take care of it if needed
+            with _signals.blocked([signal.SIGINT], ignore=False):
+                self._process = subprocess.Popen(
+                    casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT)
+
+        self._failure_callback = None
+        self._watcher = None
+
+    # _rotate_and_get_next_logfile()
+    #
+    # Get the logfile to use for casd
+    #
+    # This will ensure that we don't create too many casd log files by
+    # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around.
+    #
+    # Returns:
+    #   (str): the path to the log file to use
+    #
+    def _rotate_and_get_next_logfile(self):
+        try:
+            existing_logs = sorted(os.listdir(self._log_dir))
+        except FileNotFoundError:
+            os.makedirs(self._log_dir)
+        else:
+            while len(existing_logs) >= _CASD_MAX_LOGFILES:
+                logfile_to_delete = existing_logs.pop(0)
+                os.remove(os.path.join(self._log_dir, logfile_to_delete))
+
+        return os.path.join(self._log_dir, str(self.start_time) + ".log")
+
+    # terminate()
+    #
+    # Terminate the buildbox casd process
+    #
+    # Args:
+    #   messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
+    #
+    def terminate(self, messenger=None):
+        assert self._watcher is None
+        assert self._failure_callback is None
+
+        return_code = self._process.poll()
+
+        if return_code is not None:
+            # buildbox-casd is already dead
+            self._process = None
+
+            if messenger:
+                messenger.message(
+                    Message(
+                        MessageType.BUG,
+                        "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
+                            return_code, self._logfile
+                        ),
+                    )
+                )
+            return
+
+        self._process.terminate()
+
+        try:
+            # Don't print anything if buildbox-casd terminates quickly
+            return_code = self._process.wait(timeout=0.5)
+        except subprocess.TimeoutExpired:
+            if messenger:
+                cm = messenger.timed_activity("Terminating buildbox-casd")
+            else:
+                cm = contextlib.suppress()
+            with cm:
+                try:
+                    return_code = self._process.wait(timeout=15)
+                except subprocess.TimeoutExpired:
+                    self._process.kill()
+                    self._process.wait(timeout=15)
+
+                    if messenger:
+                        messenger.message(
+                            Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
+                        )
+                    self._process = None
+                    return
+
+        if return_code != 0 and messenger:
+            messenger.message(
+                Message(
+                    MessageType.BUG,
+                    "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(
+                        return_code, self._logfile
+                    ),
+                )
+            )
+
+    # clean_up()
+    #
+    # After termination, clean up any additional resources
+    #
+    def clean_up(self):
+        shutil.rmtree(self._socket_tempdir)
+
+    # set_failure_callback()
+    #
+    # Call this function if the CASD process stops unexpectedly.
+    #
+    # Note that we guarantee that the lifetime of any 'watcher' used is bound
+    # to the lifetime of the callback - we won't hang on to the asyncio loop
+    # longer than necessary.
+    #
+    # We won't be able to use watchers on win32, so we'll need to support
+    # another approach.
+    #
+    # Args:
+    #   func (callable): a callable that takes no parameters
+    #
+    def set_failure_callback(self, func):
+        assert func is not None
+        assert self._watcher is None
+        assert self._failure_callback is None
+        self._failure_callback = func
+        self._watcher = asyncio.get_child_watcher()
+        self._watcher.add_child_handler(self._process.pid, self._on_casd_failure)
+
+    # clear_failure_callback()
+    #
+    # No longer call this callable if the CASD process stops unexpectedly
+    #
+    # Args:
+    #   func (callable): The callable that was provided to add_failure_callback().
+    #                    Supplying this again allows us to do error checking.
+    #
+    def clear_failure_callback(self, func):
+        assert func is not None
+        assert self._failure_callback == func
+        self._watcher.remove_child_handler(self._process.pid)
+        self._failure_callback = None
+        self._watcher = None
+
+    # _on_casd_failure()
+    #
+    # Handler for casd process terminating unexpectedly
+    #
+    # Args:
+    #   pid (int): the process id under which buildbox-casd was running
+    #   returncode (int): the return code with which buildbox-casd exited
+    #
+    def _on_casd_failure(self, pid, returncode):
+        assert self._failure_callback is not None
+        self._process.returncode = returncode
+        self._failure_callback()
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index d3faa2a..b8af01c 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -138,7 +138,6 @@ class Scheduler():
         self._suspendtime = None              # Session time compensation for suspended state
         self._queue_jobs = True               # Whether we should continue to queue jobs
         self._state = state
-        self._casd_process = None             # handle to the casd process for monitoring purpose
 
         # Bidirectional queue to send notifications back to the Scheduler's owner
         self._notification_queue = notification_queue
@@ -152,8 +151,8 @@ class Scheduler():
     #
     # Args:
     #    queues (list): A list of Queue objects
-    #    casd_processes (subprocess.Process): The subprocess which runs casd in order to be notified
-    #                                         of failures.
+    #    casd_process_manager (cascache.CASDProcessManager): The subprocess which runs casd, in order to be notified
+    #                                                        of failures.
     #
     # Returns:
     #    (SchedStatus): How the scheduling terminated
@@ -163,7 +162,7 @@ class Scheduler():
     # elements have been processed by each queue or when
     # an error arises
     #
-    def run(self, queues, casd_process):
+    def run(self, queues, casd_process_manager):
 
         # Hold on to the queues to process
         self.queues = queues
@@ -183,9 +182,7 @@ class Scheduler():
         self._connect_signals()
 
         # Watch casd while running to ensure it doesn't die
-        self._casd_process = casd_process
-        _watcher = asyncio.get_child_watcher()
-        _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
+        casd_process_manager.set_failure_callback(self._abort_on_casd_failure)
 
         # Start the profiler
         with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
@@ -195,8 +192,7 @@ class Scheduler():
             self.loop.close()
 
         # Stop watching casd
-        _watcher.remove_child_handler(casd_process.pid)
-        self._casd_process = None
+        casd_process_manager.clear_failure_callback(self._abort_on_casd_failure)
 
         # Stop handling unix signals
         self._disconnect_signals()
@@ -337,15 +333,9 @@ class Scheduler():
     # This will terminate immediately all jobs, since buildbox-casd is dead,
     # we can't do anything with them anymore.
     #
-    # Args:
-    #   pid (int): the process id under which buildbox-casd was running
-    #   returncode (int): the return code with which buildbox-casd exited
-    #
-    def _abort_on_casd_failure(self, pid, returncode):
+    def _abort_on_casd_failure(self):
         message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
         self._notify(Notification(NotificationType.MESSAGE, message=message))
-
-        self._casd_process.returncode = returncode
         self.terminate_jobs()
 
     # _start_job()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c7ada6e..04fdcfc 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1365,7 +1365,7 @@ class Stream():
         if self._session_start_callback is not None:
             self._session_start_callback()
 
-        status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
+        status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process_manager())
 
         if status == SchedStatus.ERROR:
             raise StreamError()