You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:18:56 UTC

[buildstream] branch aevri/casdprocessmanager2 created (now 5ad7550)

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch aevri/casdprocessmanager2
in repository https://gitbox.apache.org/repos/asf/buildstream.git.


      at 5ad7550  tests/artifactshare: safer cleanup_on_sigterm use

This branch includes the following new commits:

     new c36382f  cascache.py: Defer attempt to connect to casd until socket file exists
     new caff940  cascache: extract CASDProcess in new module
     new bd98433  CASDProcessManager: 'release_resources' convention
     new 84aab60  Extract casd_channel logic to CASDConnection
     new 71189d4  testutils/artifactshare: don't hang on error
     new 5ad7550  tests/artifactshare: safer cleanup_on_sigterm use

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[buildstream] 01/06: cascache.py: Defer attempt to connect to casd until socket file exists

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch aevri/casdprocessmanager2
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit c36382f15fde4908ef514a171d7a974015049b8b
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Thu Oct 17 09:10:49 2019 +0200

    cascache.py: Defer attempt to connect to casd until socket file exists
    
    gRPC delays reconnect attempts by at least a second. We don't want to
    wait that long. Wait for socket file to appear to avoid the need for
    multiple connect attempts.
---
 src/buildstream/_cas/cascache.py | 22 +++++++++-------------
 1 file changed, 9 insertions(+), 13 deletions(-)

diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index eb28e2d..4ee575d 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -140,25 +140,21 @@ class CASCache():
         assert self._casd_process, "CASCache was instantiated without buildbox-casd"
 
         if not self._casd_channel:
+            while not os.path.exists(self._casd_socket_path):
+                # casd is not ready yet, try again after a 10ms delay,
+                # but don't wait for more than 15s
+                if time.time() > self._casd_start_time + 15:
+                    raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
+
+                time.sleep(0.01)
+
             self._casd_channel = grpc.insecure_channel('unix:' + self._casd_socket_path)
             self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
             self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
 
             # Call GetCapabilities() to establish connection to casd
             capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self._casd_channel)
-            while True:
-                try:
-                    capabilities.GetCapabilities(remote_execution_pb2.GetCapabilitiesRequest())
-                    break
-                except grpc.RpcError as e:
-                    if e.code() == grpc.StatusCode.UNAVAILABLE:
-                        # casd is not ready yet, try again after a 10ms delay,
-                        # but don't wait for more than 15s
-                        if time.time() < self._casd_start_time + 15:
-                            time.sleep(1 / 100)
-                            continue
-
-                    raise
+            capabilities.GetCapabilities(remote_execution_pb2.GetCapabilitiesRequest())
 
     # _get_cas():
     #


[buildstream] 02/06: cascache: extract CASDProcess in new module

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch aevri/casdprocessmanager2
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit caff94097cbaa3be3463e19c6ba2e6276d86db4e
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Oct 8 17:14:32 2019 +0100

    cascache: extract CASDProcess in new module
    
    Make it easier to specialize handling of the buildbox-casd process on
    Windows, by splitting it into it's own class. This allows us to
    encapsulate some decisions, and decreases the complexity of the CASCache
    class.
    
    Take some of the complexity out of this file by splitting the
    responsibility of managing the process out to another file.
---
 src/buildstream/_cas/cascache.py           | 168 +++++-----------------
 src/buildstream/_cas/casdprocessmanager.py | 220 +++++++++++++++++++++++++++++
 src/buildstream/_scheduler/scheduler.py    |  22 +--
 src/buildstream/_stream.py                 |   2 +-
 4 files changed, 262 insertions(+), 150 deletions(-)

diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 4ee575d..091b14e 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -25,10 +25,7 @@ import errno
 import contextlib
 import ctypes
 import multiprocessing
-import shutil
 import signal
-import subprocess
-import tempfile
 import time
 
 import grpc
@@ -40,8 +37,8 @@ from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc
 from .. import _signals, utils
 from ..types import FastEnum
 from .._exceptions import CASCacheError
-from .._message import Message, MessageType
 
+from .casdprocessmanager import CASDProcessManager
 from .casremote import _CASBatchRead, _CASBatchUpdate
 
 _BUFFER_SIZE = 65536
@@ -50,8 +47,6 @@ _BUFFER_SIZE = 65536
 # Refresh interval for disk usage of local cache in seconds
 _CACHE_USAGE_REFRESH = 5
 
-_CASD_MAX_LOGFILES = 10
-
 
 class CASLogLevel(FastEnum):
     WARNING = "warning"
@@ -86,45 +81,21 @@ class CASCache():
         self._cache_usage_monitor_forbidden = False
 
         if casd:
-            # Place socket in global/user temporary directory to avoid hitting
-            # the socket path length limit.
-            self._casd_socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
-            self._casd_socket_path = os.path.join(self._casd_socket_tempdir, 'casd.sock')
-
-            casd_args = [utils.get_host_tool('buildbox-casd')]
-            casd_args.append('--bind=unix:' + self._casd_socket_path)
-            casd_args.append('--log-level=' + log_level.value)
-
-            if cache_quota is not None:
-                casd_args.append('--quota-high={}'.format(int(cache_quota)))
-                casd_args.append('--quota-low={}'.format(int(cache_quota / 2)))
-
-                if protect_session_blobs:
-                    casd_args.append('--protect-session-blobs')
-
-            casd_args.append(path)
-
-            self._casd_start_time = time.time()
-            self.casd_logfile = self._rotate_and_get_next_logfile()
-
-            with open(self.casd_logfile, "w") as logfile_fp:
-                # Block SIGINT on buildbox-casd, we don't need to stop it
-                # The frontend will take care of it if needed
-                with _signals.blocked([signal.SIGINT], ignore=False):
-                    self._casd_process = subprocess.Popen(
-                        casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT)
+            log_dir = os.path.join(self.casdir, "logs")
+            self._casd_process_manager = CASDProcessManager(
+                path, log_dir, log_level, cache_quota, protect_session_blobs)
 
             self._cache_usage_monitor = _CASCacheUsageMonitor(self)
         else:
-            self._casd_process = None
+            self._casd_process_manager = None
 
     def __getstate__(self):
         state = self.__dict__.copy()
 
         # Popen objects are not pickle-able, however, child processes only
-        # need the information whether a casd subprocess was started or not.
-        assert '_casd_process' in state
-        state['_casd_process'] = bool(self._casd_process)
+        # need some information from the manager, so we can use a proxy.
+        if state['_casd_process_manager'] is not None:
+            state['_casd_process_manager'] = _LimitedCASDProcessManagerProxy(self._casd_process_manager)
 
         # The usage monitor is not pickle-able, but we also don't need it in
         # child processes currently. Make sure that if this changes, we get a
@@ -137,18 +108,18 @@ class CASCache():
         return state
 
     def _init_casd(self):
-        assert self._casd_process, "CASCache was instantiated without buildbox-casd"
+        assert self._casd_process_manager, "CASCache was instantiated without buildbox-casd"
 
         if not self._casd_channel:
-            while not os.path.exists(self._casd_socket_path):
+            while not os.path.exists(self._casd_process_manager.socket_path):
                 # casd is not ready yet, try again after a 10ms delay,
                 # but don't wait for more than 15s
-                if time.time() > self._casd_start_time + 15:
+                if time.time() > self._casd_process_manager.start_time + 15:
                     raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
 
                 time.sleep(0.01)
 
-            self._casd_channel = grpc.insecure_channel('unix:' + self._casd_socket_path)
+            self._casd_channel = grpc.insecure_channel('unix:' + self._casd_process_manager.socket_path)
             self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
             self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
 
@@ -211,10 +182,11 @@ class CASCache():
         if self._cache_usage_monitor:
             self._cache_usage_monitor.release_resources()
 
-        if self._casd_process:
+        if self._casd_process_manager:
             self.close_grpc_channels()
-            self._terminate_casd_process(messenger)
-            shutil.rmtree(self._casd_socket_tempdir)
+            self._casd_process_manager.terminate(messenger)
+            self._casd_process_manager.clean_up()
+            self._casd_process_manager = None
 
     # contains():
     #
@@ -691,30 +663,6 @@ class CASCache():
     #             Local Private Methods            #
     ################################################
 
-    # _rotate_and_get_next_logfile()
-    #
-    # Get the logfile to use for casd
-    #
-    # This will ensure that we don't create too many casd log files by
-    # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around.
-    #
-    # Returns:
-    #   (str): the path to the log file to use
-    #
-    def _rotate_and_get_next_logfile(self):
-        log_dir = os.path.join(self.casdir, "logs")
-
-        try:
-            existing_logs = sorted(os.listdir(log_dir))
-        except FileNotFoundError:
-            os.makedirs(log_dir)
-        else:
-            while len(existing_logs) >= _CASD_MAX_LOGFILES:
-                logfile_to_delete = existing_logs.pop(0)
-                os.remove(os.path.join(log_dir, logfile_to_delete))
-
-        return os.path.join(log_dir, str(self._casd_start_time) + ".log")
-
     def _refpath(self, ref):
         return os.path.join(self.casdir, 'refs', 'heads', ref)
 
@@ -983,67 +931,6 @@ class CASCache():
         # Upload any blobs missing on the server
         self.send_blobs(remote, missing_blobs)
 
-    # _terminate_casd_process()
-    #
-    # Terminate the buildbox casd process
-    #
-    # Args:
-    #   messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
-    #
-    def _terminate_casd_process(self, messenger=None):
-        return_code = self._casd_process.poll()
-
-        if return_code is not None:
-            # buildbox-casd is already dead
-            self._casd_process = None
-
-            if messenger:
-                messenger.message(
-                    Message(
-                        MessageType.BUG,
-                        "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
-                            return_code, self.casd_logfile
-                        ),
-                    )
-                )
-            return
-
-        self._casd_process.terminate()
-
-        try:
-            # Don't print anything if buildbox-casd terminates quickly
-            return_code = self._casd_process.wait(timeout=0.5)
-        except subprocess.TimeoutExpired:
-            if messenger:
-                cm = messenger.timed_activity("Terminating buildbox-casd")
-            else:
-                cm = contextlib.suppress()
-            with cm:
-                try:
-                    return_code = self._casd_process.wait(timeout=15)
-                except subprocess.TimeoutExpired:
-                    self._casd_process.kill()
-                    self._casd_process.wait(timeout=15)
-
-                    if messenger:
-                        messenger.message(
-                            Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
-                        )
-                    self._casd_process = None
-                    return
-
-        if return_code != 0 and messenger:
-            messenger.message(
-                Message(
-                    MessageType.BUG,
-                    "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(
-                        return_code, self.casd_logfile
-                    ),
-                )
-            )
-
-        self._casd_process = None
-
     # get_cache_usage():
     #
     # Fetches the current usage of the CAS local cache.
@@ -1055,16 +942,16 @@ class CASCache():
         assert not self._cache_usage_monitor_forbidden
         return self._cache_usage_monitor.get_cache_usage()
 
-    # get_casd_process()
+    # get_casd_process_manager()
     #
     # Get the underlying buildbox-casd process
     #
     # Returns:
     #   (subprocess.Process): The casd process that is used for the current cascache
     #
-    def get_casd_process(self):
-        assert self._casd_process is not None, "This should only be called with a running buildbox-casd process"
-        return self._casd_process
+    def get_casd_process_manager(self):
+        assert self._casd_process_manager is not None, "Only call this with a running buildbox-casd process"
+        return self._casd_process_manager
 
 
 # _CASCacheUsage
@@ -1172,3 +1059,18 @@ def _grouper(iterable, n):
         except StopIteration:
             return
         yield itertools.chain([current], itertools.islice(iterable, n - 1))
+
+
+# _LimitedCASDProcessManagerProxy
+#
+# This can stand-in for an owning CASDProcessManager, for some functions. This
+# is useful when pickling objects that contain a CASDProcessManager - as long
+# as the lifetime of the original exceeds this proxy.
+#
+# Args:
+#     casd_process_manager (CASDProcessManager): The manager to proxy
+#
+class _LimitedCASDProcessManagerProxy:
+    def __init__(self, casd_process_manager):
+        self.socket_path = casd_process_manager.socket_path
+        self.start_time = casd_process_manager.start_time
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
new file mode 100644
index 0000000..697b21f
--- /dev/null
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -0,0 +1,220 @@
+#
+#  Copyright (C) 2018 Codethink Limited
+#  Copyright (C) 2018-2019 Bloomberg Finance LP
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import asyncio
+import contextlib
+import os
+import shutil
+import signal
+import subprocess
+import tempfile
+import time
+
+from .. import _signals, utils
+from .._message import Message, MessageType
+
+_CASD_MAX_LOGFILES = 10
+
+
+# CASDProcessManager
+#
+# This manages the subprocess that runs buildbox-casd.
+#
+# Args:
+#     path (str): The root directory for the CAS repository
+#     log_dir (str): The directory for the logs
+#     log_level (LogLevel): Log level to give to buildbox-casd for logging
+#     cache_quota (int): User configured cache quota
+#     protect_session_blobs (bool): Disable expiry for blobs used in the current session
+#
+class CASDProcessManager:
+
+    def __init__(self, path, log_dir, log_level, cache_quota, protect_session_blobs):
+        self._log_dir = log_dir
+
+        # Place socket in global/user temporary directory to avoid hitting
+        # the socket path length limit.
+        self._socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
+        self.socket_path = os.path.join(self._socket_tempdir, 'casd.sock')
+
+        casd_args = [utils.get_host_tool('buildbox-casd')]
+        casd_args.append('--bind=unix:' + self.socket_path)
+        casd_args.append('--log-level=' + log_level.value)
+
+        if cache_quota is not None:
+            casd_args.append('--quota-high={}'.format(int(cache_quota)))
+            casd_args.append('--quota-low={}'.format(int(cache_quota / 2)))
+
+            if protect_session_blobs:
+                casd_args.append('--protect-session-blobs')
+
+        casd_args.append(path)
+
+        self.start_time = time.time()
+        self._logfile = self._rotate_and_get_next_logfile()
+
+        with open(self._logfile, "w") as logfile_fp:
+            # Block SIGINT on buildbox-casd, we don't need to stop it
+            # The frontend will take care of it if needed
+            with _signals.blocked([signal.SIGINT], ignore=False):
+                self._process = subprocess.Popen(
+                    casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT)
+
+        self._failure_callback = None
+        self._watcher = None
+
+    # _rotate_and_get_next_logfile()
+    #
+    # Get the logfile to use for casd
+    #
+    # This will ensure that we don't create too many casd log files by
+    # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around.
+    #
+    # Returns:
+    #   (str): the path to the log file to use
+    #
+    def _rotate_and_get_next_logfile(self):
+        try:
+            existing_logs = sorted(os.listdir(self._log_dir))
+        except FileNotFoundError:
+            os.makedirs(self._log_dir)
+        else:
+            while len(existing_logs) >= _CASD_MAX_LOGFILES:
+                logfile_to_delete = existing_logs.pop(0)
+                os.remove(os.path.join(self._log_dir, logfile_to_delete))
+
+        return os.path.join(self._log_dir, str(self.start_time) + ".log")
+
+    # terminate()
+    #
+    # Terminate the buildbox casd process
+    #
+    # Args:
+    #   messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
+    #
+    def terminate(self, messenger=None):
+        assert self._watcher is None
+        assert self._failure_callback is None
+
+        return_code = self._process.poll()
+
+        if return_code is not None:
+            # buildbox-casd is already dead
+            self._process = None
+
+            if messenger:
+                messenger.message(
+                    Message(
+                        MessageType.BUG,
+                        "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
+                            return_code, self._logfile
+                        ),
+                    )
+                )
+            return
+
+        self._process.terminate()
+
+        try:
+            # Don't print anything if buildbox-casd terminates quickly
+            return_code = self._process.wait(timeout=0.5)
+        except subprocess.TimeoutExpired:
+            if messenger:
+                cm = messenger.timed_activity("Terminating buildbox-casd")
+            else:
+                cm = contextlib.suppress()
+            with cm:
+                try:
+                    return_code = self._process.wait(timeout=15)
+                except subprocess.TimeoutExpired:
+                    self._process.kill()
+                    self._process.wait(timeout=15)
+
+                    if messenger:
+                        messenger.message(
+                            Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
+                        )
+                    self._process = None
+                    return
+
+        if return_code != 0 and messenger:
+            messenger.message(
+                Message(
+                    MessageType.BUG,
+                    "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(
+                        return_code, self._logfile
+                    ),
+                )
+            )
+
+    # clean_up()
+    #
+    # After termination, clean up any additional resources
+    #
+    def clean_up(self):
+        shutil.rmtree(self._socket_tempdir)
+
+    # set_failure_callback()
+    #
+    # Call this function if the CASD process stops unexpectedly.
+    #
+    # Note that we guarantee that the lifetime of any 'watcher' used is bound
+    # to the lifetime of the callback - we won't hang on to the asyncio loop
+    # longer than necessary.
+    #
+    # We won't be able to use watchers on win32, so we'll need to support
+    # another approach.
+    #
+    # Args:
+    #   func (callable): a callable that takes no parameters
+    #
+    def set_failure_callback(self, func):
+        assert func is not None
+        assert self._watcher is None
+        assert self._failure_callback is None
+        self._failure_callback = func
+        self._watcher = asyncio.get_child_watcher()
+        self._watcher.add_child_handler(self._process.pid, self._on_casd_failure)
+
+    # clear_failure_callback()
+    #
+    # No longer call this callable if the CASD process stops unexpectedly
+    #
+    # Args:
+    #   func (callable): The callable that was provided to add_failure_callback().
+    #                    Supplying this again allows us to do error checking.
+    #
+    def clear_failure_callback(self, func):
+        assert func is not None
+        assert self._failure_callback == func
+        self._watcher.remove_child_handler(self._process.pid)
+        self._failure_callback = None
+        self._watcher = None
+
+    # _on_casd_failure()
+    #
+    # Handler for casd process terminating unexpectedly
+    #
+    # Args:
+    #   pid (int): the process id under which buildbox-casd was running
+    #   returncode (int): the return code with which buildbox-casd exited
+    #
+    def _on_casd_failure(self, pid, returncode):
+        assert self._failure_callback is not None
+        self._process.returncode = returncode
+        self._failure_callback()
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index d3faa2a..b8af01c 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -138,7 +138,6 @@ class Scheduler():
         self._suspendtime = None              # Session time compensation for suspended state
         self._queue_jobs = True               # Whether we should continue to queue jobs
         self._state = state
-        self._casd_process = None             # handle to the casd process for monitoring purpose
 
         # Bidirectional queue to send notifications back to the Scheduler's owner
         self._notification_queue = notification_queue
@@ -152,8 +151,8 @@ class Scheduler():
     #
     # Args:
     #    queues (list): A list of Queue objects
-    #    casd_processes (subprocess.Process): The subprocess which runs casd in order to be notified
-    #                                         of failures.
+    #    casd_process_manager (cascache.CASDProcessManager): The subprocess which runs casd, in order to be notified
+    #                                                        of failures.
     #
     # Returns:
     #    (SchedStatus): How the scheduling terminated
@@ -163,7 +162,7 @@ class Scheduler():
     # elements have been processed by each queue or when
     # an error arises
     #
-    def run(self, queues, casd_process):
+    def run(self, queues, casd_process_manager):
 
         # Hold on to the queues to process
         self.queues = queues
@@ -183,9 +182,7 @@ class Scheduler():
         self._connect_signals()
 
         # Watch casd while running to ensure it doesn't die
-        self._casd_process = casd_process
-        _watcher = asyncio.get_child_watcher()
-        _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
+        casd_process_manager.set_failure_callback(self._abort_on_casd_failure)
 
         # Start the profiler
         with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
@@ -195,8 +192,7 @@ class Scheduler():
             self.loop.close()
 
         # Stop watching casd
-        _watcher.remove_child_handler(casd_process.pid)
-        self._casd_process = None
+        casd_process_manager.clear_failure_callback(self._abort_on_casd_failure)
 
         # Stop handling unix signals
         self._disconnect_signals()
@@ -337,15 +333,9 @@ class Scheduler():
     # This will terminate immediately all jobs, since buildbox-casd is dead,
     # we can't do anything with them anymore.
     #
-    # Args:
-    #   pid (int): the process id under which buildbox-casd was running
-    #   returncode (int): the return code with which buildbox-casd exited
-    #
-    def _abort_on_casd_failure(self, pid, returncode):
+    def _abort_on_casd_failure(self):
         message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
         self._notify(Notification(NotificationType.MESSAGE, message=message))
-
-        self._casd_process.returncode = returncode
         self.terminate_jobs()
 
     # _start_job()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c7ada6e..04fdcfc 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1365,7 +1365,7 @@ class Stream():
         if self._session_start_callback is not None:
             self._session_start_callback()
 
-        status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
+        status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process_manager())
 
         if status == SchedStatus.ERROR:
             raise StreamError()


[buildstream] 03/06: CASDProcessManager: 'release_resources' convention

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch aevri/casdprocessmanager2
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit bd984334f2d5590fd60d16f775a2132dabac2831
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Fri Oct 11 13:32:21 2019 +0100

    CASDProcessManager: 'release_resources' convention
    
    Elsewhere in cascache, cleaning up is called 'release_resources', so
    follow that convention for consistency.
    
    Also fix a case where self._process was not set to None in terminate().
---
 src/buildstream/_cas/cascache.py           |  3 +--
 src/buildstream/_cas/casdprocessmanager.py | 25 +++++++++++--------------
 2 files changed, 12 insertions(+), 16 deletions(-)

diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 091b14e..aefc1b9 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -184,8 +184,7 @@ class CASCache():
 
         if self._casd_process_manager:
             self.close_grpc_channels()
-            self._casd_process_manager.terminate(messenger)
-            self._casd_process_manager.clean_up()
+            self._casd_process_manager.release_resources(messenger)
             self._casd_process_manager = None
 
     # contains():
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
index 697b21f..3a434ad 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -100,14 +100,20 @@ class CASDProcessManager:
 
         return os.path.join(self._log_dir, str(self.start_time) + ".log")
 
-    # terminate()
+    # release_resources()
     #
-    # Terminate the buildbox casd process
+    # Terminate the process and release related resources.
     #
-    # Args:
-    #   messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
+    def release_resources(self, messenger=None):
+        self._terminate(messenger)
+        self._process = None
+        shutil.rmtree(self._socket_tempdir)
+
+    # _terminate()
+    #
+    # Terminate the buildbox casd process.
     #
-    def terminate(self, messenger=None):
+    def _terminate(self, messenger=None):
         assert self._watcher is None
         assert self._failure_callback is None
 
@@ -115,7 +121,6 @@ class CASDProcessManager:
 
         if return_code is not None:
             # buildbox-casd is already dead
-            self._process = None
 
             if messenger:
                 messenger.message(
@@ -149,7 +154,6 @@ class CASDProcessManager:
                         messenger.message(
                             Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
                         )
-                    self._process = None
                     return
 
         if return_code != 0 and messenger:
@@ -162,13 +166,6 @@ class CASDProcessManager:
                 )
             )
 
-    # clean_up()
-    #
-    # After termination, clean up any additional resources
-    #
-    def clean_up(self):
-        shutil.rmtree(self._socket_tempdir)
-
     # set_failure_callback()
     #
     # Call this function if the CASD process stops unexpectedly.


[buildstream] 06/06: tests/artifactshare: safer cleanup_on_sigterm use

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch aevri/casdprocessmanager2
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 5ad7550a9129de60b5b57a60055c781611119d67
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Mon Oct 28 15:44:51 2019 +0000

    tests/artifactshare: safer cleanup_on_sigterm use
    
    Use the documented path [1] to `pytest_cov.embed.cleanup_on_sigterm()`,
    to avoid crashing on some versions.
    
    It turns out that pytest_cov v2.6.1 on my machine doesn't like the way
    that we were accessing cleanup_on_sigterm(). Access it in such a way
    that we will either get the function or an ImportError, as per the
    documentation.
    
    [1]: https://pytest-cov.readthedocs.io/en/latest/subprocess-support.html
---
 tests/testutils/artifactshare.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index 7a4eda1..e49531d 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -162,11 +162,11 @@ def _start_artifact_server(queue, repodir, quota, index_only):
             signal.signal(signal.SIGTERM, lambda signalnum, frame: sys.exit(0))
 
             try:
-                import pytest_cov
+                from pytest_cov.embed import cleanup_on_sigterm
             except ImportError:
                 pass
             else:
-                pytest_cov.embed.cleanup_on_sigterm()
+                cleanup_on_sigterm()
 
             server = stack.enter_context(
                 create_server(


[buildstream] 05/06: testutils/artifactshare: don't hang on error

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch aevri/casdprocessmanager2
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 71189d46622201550b86de569aa1517adbd328c7
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Mon Oct 28 15:39:54 2019 +0000

    testutils/artifactshare: don't hang on error
    
    Remove a couple of cases where it's possible to make the main test
    process hang, waiting for something to appear on a queue.
    
    Make it clearer what is available to the server process by
    splitting it out into a non-member function.
    
    Raise a friendlier exception, earlier, if there was a problem starting
    the server process.
---
 tests/testutils/artifactshare.py | 84 +++++++++++++++++++++-------------------
 1 file changed, 45 insertions(+), 39 deletions(-)

diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index 87b0808..7a4eda1 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -4,7 +4,7 @@ import signal
 import sys
 from collections import namedtuple
 
-from contextlib import contextmanager
+from contextlib import ExitStack, contextmanager
 from multiprocessing import Process, Queue
 
 from buildstream._cas import CASCache
@@ -50,50 +50,20 @@ class ArtifactShare():
 
         q = Queue()
 
-        self.process = Process(target=self.run, args=(q,))
+        self.process = Process(
+            target=_start_artifact_server,
+            args=(q, self.repodir, self.quota, self.index_only),
+        )
+
         self.process.start()
 
         # Retrieve port from server subprocess
         port = q.get()
 
-        self.repo = 'http://localhost:{}'.format(port)
-
-    # run():
-    #
-    # Run the artifact server.
-    #
-    def run(self, q):
-
-        # Handle SIGTERM by calling sys.exit(0), which will raise a SystemExit exception,
-        # properly executing cleanup code in `finally` clauses and context managers.
-        # This is required to terminate buildbox-casd on SIGTERM.
-        signal.signal(signal.SIGTERM, lambda signalnum, frame: sys.exit(0))
-
-        try:
-            import pytest_cov
-        except ImportError:
-            pass
-        else:
-            pytest_cov.embed.cleanup_on_sigterm()
-
-        try:
-            with create_server(self.repodir,
-                               quota=self.quota,
-                               enable_push=True,
-                               index_only=self.index_only) as server:
-                port = server.add_insecure_port('localhost:0')
-
-                server.start()
-
-                # Send port to parent
-                q.put(port)
+        if port is None:
+            raise Exception("Error occurred when starting artifact server.")
 
-                # Sleep until termination by signal
-                signal.pause()
-
-        except Exception:
-            q.put(None)
-            raise
+        self.repo = 'http://localhost:{}'.format(port)
 
     # has_object():
     #
@@ -183,6 +153,42 @@ class ArtifactShare():
         shutil.rmtree(self.directory)
 
 
+def _start_artifact_server(queue, repodir, quota, index_only):
+    with ExitStack() as stack:
+        try:
+            # Handle SIGTERM by calling sys.exit(0), which will raise a SystemExit exception,
+            # properly executing cleanup code in `finally` clauses and context managers.
+            # This is required to terminate buildbox-casd on SIGTERM.
+            signal.signal(signal.SIGTERM, lambda signalnum, frame: sys.exit(0))
+
+            try:
+                import pytest_cov
+            except ImportError:
+                pass
+            else:
+                pytest_cov.embed.cleanup_on_sigterm()
+
+            server = stack.enter_context(
+                create_server(
+                    repodir,
+                    quota=quota,
+                    enable_push=True,
+                    index_only=index_only,
+                )
+            )
+            port = server.add_insecure_port('localhost:0')
+            server.start()
+        except Exception:
+            queue.put(None)
+            raise
+
+        # Send port to parent
+        queue.put(port)
+
+        # Sleep until termination by signal
+        signal.pause()
+
+
 # create_artifact_share()
 #
 # Create an ArtifactShare for use in a test case


[buildstream] 04/06: Extract casd_channel logic to CASDConnection

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch aevri/casdprocessmanager2
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 84aab60bd8066439ed4971a23b21288d30729a87
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Mon Oct 14 13:53:00 2019 +0100

    Extract casd_channel logic to CASDConnection
    
    Encapsulate the management of a connection to CASD, so we can hide the
    details of how it happens. This will make it easier to port to Windows,
    as we will have to take a different approach there.
    
    Also make get_local_cas() public, since it is already used outside of
    the CASCache class.
---
 src/buildstream/_cas/cascache.py           | 98 ++++++++++++++----------------
 src/buildstream/_cas/casdprocessmanager.py | 83 ++++++++++++++++++++++++-
 src/buildstream/_cas/casremote.py          |  6 +-
 3 files changed, 130 insertions(+), 57 deletions(-)

diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index aefc1b9..65359ff 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -31,14 +31,14 @@ import time
 import grpc
 
 from .._protos.google.rpc import code_pb2
-from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
-from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc
+from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
+from .._protos.build.buildgrid import local_cas_pb2
 
 from .. import _signals, utils
 from ..types import FastEnum
 from .._exceptions import CASCacheError
 
-from .casdprocessmanager import CASDProcessManager
+from .casdprocessmanager import CASDConnection, CASDProcessManager
 from .casremote import _CASBatchRead, _CASBatchUpdate
 
 _BUFFER_SIZE = 65536
@@ -74,9 +74,6 @@ class CASCache():
         os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
         os.makedirs(self.tmpdir, exist_ok=True)
 
-        self._casd_channel = None
-        self._casd_cas = None
-        self._local_cas = None
         self._cache_usage_monitor = None
         self._cache_usage_monitor_forbidden = False
 
@@ -107,43 +104,12 @@ class CASCache():
 
         return state
 
-    def _init_casd(self):
-        assert self._casd_process_manager, "CASCache was instantiated without buildbox-casd"
-
-        if not self._casd_channel:
-            while not os.path.exists(self._casd_process_manager.socket_path):
-                # casd is not ready yet, try again after a 10ms delay,
-                # but don't wait for more than 15s
-                if time.time() > self._casd_process_manager.start_time + 15:
-                    raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
-
-                time.sleep(0.01)
-
-            self._casd_channel = grpc.insecure_channel('unix:' + self._casd_process_manager.socket_path)
-            self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
-            self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
-
-            # Call GetCapabilities() to establish connection to casd
-            capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self._casd_channel)
-            capabilities.GetCapabilities(remote_execution_pb2.GetCapabilitiesRequest())
-
-    # _get_cas():
-    #
-    # Return ContentAddressableStorage stub for buildbox-casd channel.
-    #
-    def _get_cas(self):
-        if not self._casd_cas:
-            self._init_casd()
-        return self._casd_cas
-
-    # _get_local_cas():
+    # get_local_cas():
     #
     # Return LocalCAS stub for buildbox-casd channel.
     #
-    def _get_local_cas(self):
-        if not self._local_cas:
-            self._init_casd()
-        return self._local_cas
+    def get_local_cas(self):
+        return self._casd_process_manager.get_connection().get_local_cas()
 
     # preflight():
     #
@@ -161,18 +127,17 @@ class CASCache():
     # against fork() with open gRPC channels.
     #
     def has_open_grpc_channels(self):
-        return bool(self._casd_channel)
+        if self._casd_process_manager:
+            return self._casd_process_manager.has_open_grpc_channels()
+        return False
 
     # close_grpc_channels():
     #
     # Close the casd channel if it exists
     #
     def close_grpc_channels(self):
-        if self._casd_channel:
-            self._local_cas = None
-            self._casd_cas = None
-            self._casd_channel.close()
-            self._casd_channel = None
+        if self._casd_process_manager:
+            self._casd_process_manager.close_grpc_channels()
 
     # release_resources():
     #
@@ -390,8 +355,7 @@ class CASCache():
 
             request.path.append(path)
 
-            local_cas = self._get_local_cas()
-
+            local_cas = self.get_local_cas()
             response = local_cas.CaptureFiles(request)
 
             if len(response.responses) != 1:
@@ -417,7 +381,7 @@ class CASCache():
     #     (Digest): The digest of the imported directory
     #
     def import_directory(self, path):
-        local_cas = self._get_local_cas()
+        local_cas = self.get_local_cas()
 
         request = local_cas_pb2.CaptureTreeRequest()
         request.path.append(path)
@@ -537,7 +501,7 @@ class CASCache():
     # Returns: List of missing Digest objects
     #
     def remote_missing_blobs(self, remote, blobs):
-        cas = self._get_cas()
+        cas = self._casd_process_manager.get_connection().get_cas()
         instance_name = remote.local_cas_instance_name
 
         missing_blobs = dict()
@@ -1032,7 +996,7 @@ class _CASCacheUsageMonitor:
 
         disk_usage = self._disk_usage
         disk_quota = self._disk_quota
-        local_cas = self.cas._get_local_cas()
+        local_cas = self.cas.get_local_cas()
 
         while True:
             try:
@@ -1071,5 +1035,33 @@ def _grouper(iterable, n):
 #
 class _LimitedCASDProcessManagerProxy:
     def __init__(self, casd_process_manager):
-        self.socket_path = casd_process_manager.socket_path
-        self.start_time = casd_process_manager.start_time
+        self._casd_connection = None
+        self._connection_string = casd_process_manager.connection_string
+        self._start_time = casd_process_manager.start_time
+        self._socket_path = casd_process_manager.socket_path
+
+    # get_connection():
+    #
+    # Return ContentAddressableStorage stub for buildbox-casd channel.
+    #
+    def get_connection(self):
+        if not self._casd_connection:
+            self._casd_connection = CASDConnection(
+                self._socket_path, self._connection_string, self._start_time)
+        return self._casd_connection
+
+    # has_open_grpc_channels():
+    #
+    # Return whether there are gRPC channel instances. This is used to safeguard
+    # against fork() with open gRPC channels.
+    #
+    def has_open_grpc_channels(self):
+        return bool(self._casd_connection)
+
+    # close_grpc_channels():
+    #
+    # Close the casd channel if it exists
+    #
+    def close_grpc_channels(self):
+        if self._casd_connection:
+            self._casd_connection.close()
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
index 3a434ad..c096db1 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -25,7 +25,13 @@ import subprocess
 import tempfile
 import time
 
+import grpc
+
+from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
+from .._protos.build.buildgrid import local_cas_pb2_grpc
+
 from .. import _signals, utils
+from .._exceptions import CASCacheError
 from .._message import Message, MessageType
 
 _CASD_MAX_LOGFILES = 10
@@ -47,13 +53,16 @@ class CASDProcessManager:
     def __init__(self, path, log_dir, log_level, cache_quota, protect_session_blobs):
         self._log_dir = log_dir
 
+        self._casd_connection = None
+
         # Place socket in global/user temporary directory to avoid hitting
         # the socket path length limit.
         self._socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
         self.socket_path = os.path.join(self._socket_tempdir, 'casd.sock')
+        self.connection_string = "unix:" + self.socket_path
 
         casd_args = [utils.get_host_tool('buildbox-casd')]
-        casd_args.append('--bind=unix:' + self.socket_path)
+        casd_args.append('--bind=' + self.connection_string)
         casd_args.append('--log-level=' + log_level.value)
 
         if cache_quota is not None:
@@ -215,3 +224,75 @@ class CASDProcessManager:
         assert self._failure_callback is not None
         self._process.returncode = returncode
         self._failure_callback()
+
+    # get_connection():
+    #
+    # Return ContentAddressableStorage stub for buildbox-casd channel.
+    #
+    def get_connection(self):
+        if not self._casd_connection:
+            self._casd_connection = CASDConnection(
+                self.socket_path, self.connection_string, self.start_time)
+        return self._casd_connection
+
+    # has_open_grpc_channels():
+    #
+    # Return whether there are gRPC channel instances. This is used to safeguard
+    # against fork() with open gRPC channels.
+    #
+    def has_open_grpc_channels(self):
+        return bool(self._casd_connection)
+
+    # close_grpc_channels():
+    #
+    # Close the casd channel if it exists
+    #
+    def close_grpc_channels(self):
+        if self._casd_connection:
+            self._casd_connection.close()
+
+
+class CASDConnection:
+    def __init__(self, socket_path, connection_string, start_time):
+        while not os.path.exists(socket_path):
+            # casd is not ready yet, try again after a 10ms delay,
+            # but don't wait for more than 15s
+            if time.time() > start_time + 15:
+                raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
+
+            time.sleep(0.01)
+
+        self._casd_channel = grpc.insecure_channel(connection_string)
+        self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
+        self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
+
+        # Call GetCapabilities() to establish connection to casd
+        capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self._casd_channel)
+        capabilities.GetCapabilities(remote_execution_pb2.GetCapabilitiesRequest())
+
+    # get_cas():
+    #
+    # Return ContentAddressableStorage stub for buildbox-casd channel.
+    #
+    def get_cas(self):
+        assert self._casd_channel is not None
+        return self._casd_cas
+
+    # get_local_cas():
+    #
+    # Return LocalCAS stub for buildbox-casd channel.
+    #
+    def get_local_cas(self):
+        assert self._casd_channel is not None
+        return self._local_cas
+
+    # close():
+    #
+    # Close the casd channel.
+    #
+    def close(self):
+        assert self._casd_channel is not None
+        self._local_cas = None
+        self._casd_cas = None
+        self._casd_channel.close()
+        self._casd_channel = None
diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py
index a054b28..c89ea9f 100644
--- a/src/buildstream/_cas/casremote.py
+++ b/src/buildstream/_cas/casremote.py
@@ -55,7 +55,7 @@ class CASRemote(BaseRemote):
     # be called outside of init().
     #
     def _configure_protocols(self):
-        local_cas = self.cascache._get_local_cas()
+        local_cas = self.cascache.get_local_cas()
         request = local_cas_pb2.GetInstanceNameForRemoteRequest()
         request.url = self.spec.url
         if self.spec.instance_name:
@@ -115,7 +115,7 @@ class _CASBatchRead():
         if not self._requests:
             return
 
-        local_cas = self._remote.cascache._get_local_cas()
+        local_cas = self._remote.cascache.get_local_cas()
 
         for request in self._requests:
             batch_response = local_cas.FetchMissingBlobs(request)
@@ -163,7 +163,7 @@ class _CASBatchUpdate():
         if not self._requests:
             return
 
-        local_cas = self._remote.cascache._get_local_cas()
+        local_cas = self._remote.cascache.get_local_cas()
 
         for request in self._requests:
             batch_response = local_cas.UploadMissingBlobs(request)