You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:18:56 UTC
[buildstream] branch aevri/casdprocessmanager2 created (now 5ad7550)
This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a change to branch aevri/casdprocessmanager2
in repository https://gitbox.apache.org/repos/asf/buildstream.git.
at 5ad7550 tests/artifactshare: safer cleanup_on_sigterm use
This branch includes the following new commits:
new c36382f cascache.py: Defer attempt to connect to casd until socket file exists
new caff940 cascache: extract CASDProcess in new module
new bd98433 CASDProcessManager: 'release_resources' convention
new 84aab60 Extract casd_channel logic to CASDConnection
new 71189d4 testutils/artifactshare: don't hang on error
new 5ad7550 tests/artifactshare: safer cleanup_on_sigterm use
The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[buildstream] 01/06: cascache.py: Defer attempt to connect to casd
until socket file exists
Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch aevri/casdprocessmanager2
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit c36382f15fde4908ef514a171d7a974015049b8b
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Thu Oct 17 09:10:49 2019 +0200
cascache.py: Defer attempt to connect to casd until socket file exists
gRPC delays reconnect attempts by at least a second. We don't want to
wait that long. Wait for socket file to appear to avoid the need for
multiple connect attempts.
---
src/buildstream/_cas/cascache.py | 22 +++++++++-------------
1 file changed, 9 insertions(+), 13 deletions(-)
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index eb28e2d..4ee575d 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -140,25 +140,21 @@ class CASCache():
assert self._casd_process, "CASCache was instantiated without buildbox-casd"
if not self._casd_channel:
+ while not os.path.exists(self._casd_socket_path):
+ # casd is not ready yet, try again after a 10ms delay,
+ # but don't wait for more than 15s
+ if time.time() > self._casd_start_time + 15:
+ raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
+
+ time.sleep(0.01)
+
self._casd_channel = grpc.insecure_channel('unix:' + self._casd_socket_path)
self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
# Call GetCapabilities() to establish connection to casd
capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self._casd_channel)
- while True:
- try:
- capabilities.GetCapabilities(remote_execution_pb2.GetCapabilitiesRequest())
- break
- except grpc.RpcError as e:
- if e.code() == grpc.StatusCode.UNAVAILABLE:
- # casd is not ready yet, try again after a 10ms delay,
- # but don't wait for more than 15s
- if time.time() < self._casd_start_time + 15:
- time.sleep(1 / 100)
- continue
-
- raise
+ capabilities.GetCapabilities(remote_execution_pb2.GetCapabilitiesRequest())
# _get_cas():
#
[buildstream] 02/06: cascache: extract CASDProcess in new module
Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch aevri/casdprocessmanager2
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit caff94097cbaa3be3463e19c6ba2e6276d86db4e
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Oct 8 17:14:32 2019 +0100
cascache: extract CASDProcess in new module
Make it easier to specialize handling of the buildbox-casd process on
Windows, by splitting it into it's own class. This allows us to
encapsulate some decisions, and decreases the complexity of the CASCache
class.
Take some of the complexity out of this file by splitting the
responsibility of managing the process out to another file.
---
src/buildstream/_cas/cascache.py | 168 +++++-----------------
src/buildstream/_cas/casdprocessmanager.py | 220 +++++++++++++++++++++++++++++
src/buildstream/_scheduler/scheduler.py | 22 +--
src/buildstream/_stream.py | 2 +-
4 files changed, 262 insertions(+), 150 deletions(-)
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 4ee575d..091b14e 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -25,10 +25,7 @@ import errno
import contextlib
import ctypes
import multiprocessing
-import shutil
import signal
-import subprocess
-import tempfile
import time
import grpc
@@ -40,8 +37,8 @@ from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc
from .. import _signals, utils
from ..types import FastEnum
from .._exceptions import CASCacheError
-from .._message import Message, MessageType
+from .casdprocessmanager import CASDProcessManager
from .casremote import _CASBatchRead, _CASBatchUpdate
_BUFFER_SIZE = 65536
@@ -50,8 +47,6 @@ _BUFFER_SIZE = 65536
# Refresh interval for disk usage of local cache in seconds
_CACHE_USAGE_REFRESH = 5
-_CASD_MAX_LOGFILES = 10
-
class CASLogLevel(FastEnum):
WARNING = "warning"
@@ -86,45 +81,21 @@ class CASCache():
self._cache_usage_monitor_forbidden = False
if casd:
- # Place socket in global/user temporary directory to avoid hitting
- # the socket path length limit.
- self._casd_socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
- self._casd_socket_path = os.path.join(self._casd_socket_tempdir, 'casd.sock')
-
- casd_args = [utils.get_host_tool('buildbox-casd')]
- casd_args.append('--bind=unix:' + self._casd_socket_path)
- casd_args.append('--log-level=' + log_level.value)
-
- if cache_quota is not None:
- casd_args.append('--quota-high={}'.format(int(cache_quota)))
- casd_args.append('--quota-low={}'.format(int(cache_quota / 2)))
-
- if protect_session_blobs:
- casd_args.append('--protect-session-blobs')
-
- casd_args.append(path)
-
- self._casd_start_time = time.time()
- self.casd_logfile = self._rotate_and_get_next_logfile()
-
- with open(self.casd_logfile, "w") as logfile_fp:
- # Block SIGINT on buildbox-casd, we don't need to stop it
- # The frontend will take care of it if needed
- with _signals.blocked([signal.SIGINT], ignore=False):
- self._casd_process = subprocess.Popen(
- casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT)
+ log_dir = os.path.join(self.casdir, "logs")
+ self._casd_process_manager = CASDProcessManager(
+ path, log_dir, log_level, cache_quota, protect_session_blobs)
self._cache_usage_monitor = _CASCacheUsageMonitor(self)
else:
- self._casd_process = None
+ self._casd_process_manager = None
def __getstate__(self):
state = self.__dict__.copy()
# Popen objects are not pickle-able, however, child processes only
- # need the information whether a casd subprocess was started or not.
- assert '_casd_process' in state
- state['_casd_process'] = bool(self._casd_process)
+ # need some information from the manager, so we can use a proxy.
+ if state['_casd_process_manager'] is not None:
+ state['_casd_process_manager'] = _LimitedCASDProcessManagerProxy(self._casd_process_manager)
# The usage monitor is not pickle-able, but we also don't need it in
# child processes currently. Make sure that if this changes, we get a
@@ -137,18 +108,18 @@ class CASCache():
return state
def _init_casd(self):
- assert self._casd_process, "CASCache was instantiated without buildbox-casd"
+ assert self._casd_process_manager, "CASCache was instantiated without buildbox-casd"
if not self._casd_channel:
- while not os.path.exists(self._casd_socket_path):
+ while not os.path.exists(self._casd_process_manager.socket_path):
# casd is not ready yet, try again after a 10ms delay,
# but don't wait for more than 15s
- if time.time() > self._casd_start_time + 15:
+ if time.time() > self._casd_process_manager.start_time + 15:
raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
time.sleep(0.01)
- self._casd_channel = grpc.insecure_channel('unix:' + self._casd_socket_path)
+ self._casd_channel = grpc.insecure_channel('unix:' + self._casd_process_manager.socket_path)
self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
@@ -211,10 +182,11 @@ class CASCache():
if self._cache_usage_monitor:
self._cache_usage_monitor.release_resources()
- if self._casd_process:
+ if self._casd_process_manager:
self.close_grpc_channels()
- self._terminate_casd_process(messenger)
- shutil.rmtree(self._casd_socket_tempdir)
+ self._casd_process_manager.terminate(messenger)
+ self._casd_process_manager.clean_up()
+ self._casd_process_manager = None
# contains():
#
@@ -691,30 +663,6 @@ class CASCache():
# Local Private Methods #
################################################
- # _rotate_and_get_next_logfile()
- #
- # Get the logfile to use for casd
- #
- # This will ensure that we don't create too many casd log files by
- # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around.
- #
- # Returns:
- # (str): the path to the log file to use
- #
- def _rotate_and_get_next_logfile(self):
- log_dir = os.path.join(self.casdir, "logs")
-
- try:
- existing_logs = sorted(os.listdir(log_dir))
- except FileNotFoundError:
- os.makedirs(log_dir)
- else:
- while len(existing_logs) >= _CASD_MAX_LOGFILES:
- logfile_to_delete = existing_logs.pop(0)
- os.remove(os.path.join(log_dir, logfile_to_delete))
-
- return os.path.join(log_dir, str(self._casd_start_time) + ".log")
-
def _refpath(self, ref):
return os.path.join(self.casdir, 'refs', 'heads', ref)
@@ -983,67 +931,6 @@ class CASCache():
# Upload any blobs missing on the server
self.send_blobs(remote, missing_blobs)
- # _terminate_casd_process()
- #
- # Terminate the buildbox casd process
- #
- # Args:
- # messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
- #
- def _terminate_casd_process(self, messenger=None):
- return_code = self._casd_process.poll()
-
- if return_code is not None:
- # buildbox-casd is already dead
- self._casd_process = None
-
- if messenger:
- messenger.message(
- Message(
- MessageType.BUG,
- "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
- return_code, self.casd_logfile
- ),
- )
- )
- return
-
- self._casd_process.terminate()
-
- try:
- # Don't print anything if buildbox-casd terminates quickly
- return_code = self._casd_process.wait(timeout=0.5)
- except subprocess.TimeoutExpired:
- if messenger:
- cm = messenger.timed_activity("Terminating buildbox-casd")
- else:
- cm = contextlib.suppress()
- with cm:
- try:
- return_code = self._casd_process.wait(timeout=15)
- except subprocess.TimeoutExpired:
- self._casd_process.kill()
- self._casd_process.wait(timeout=15)
-
- if messenger:
- messenger.message(
- Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
- )
- self._casd_process = None
- return
-
- if return_code != 0 and messenger:
- messenger.message(
- Message(
- MessageType.BUG,
- "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(
- return_code, self.casd_logfile
- ),
- )
- )
-
- self._casd_process = None
-
# get_cache_usage():
#
# Fetches the current usage of the CAS local cache.
@@ -1055,16 +942,16 @@ class CASCache():
assert not self._cache_usage_monitor_forbidden
return self._cache_usage_monitor.get_cache_usage()
- # get_casd_process()
+ # get_casd_process_manager()
#
# Get the underlying buildbox-casd process
#
# Returns:
# (subprocess.Process): The casd process that is used for the current cascache
#
- def get_casd_process(self):
- assert self._casd_process is not None, "This should only be called with a running buildbox-casd process"
- return self._casd_process
+ def get_casd_process_manager(self):
+ assert self._casd_process_manager is not None, "Only call this with a running buildbox-casd process"
+ return self._casd_process_manager
# _CASCacheUsage
@@ -1172,3 +1059,18 @@ def _grouper(iterable, n):
except StopIteration:
return
yield itertools.chain([current], itertools.islice(iterable, n - 1))
+
+
+# _LimitedCASDProcessManagerProxy
+#
+# This can stand-in for an owning CASDProcessManager, for some functions. This
+# is useful when pickling objects that contain a CASDProcessManager - as long
+# as the lifetime of the original exceeds this proxy.
+#
+# Args:
+# casd_process_manager (CASDProcessManager): The manager to proxy
+#
+class _LimitedCASDProcessManagerProxy:
+ def __init__(self, casd_process_manager):
+ self.socket_path = casd_process_manager.socket_path
+ self.start_time = casd_process_manager.start_time
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
new file mode 100644
index 0000000..697b21f
--- /dev/null
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -0,0 +1,220 @@
+#
+# Copyright (C) 2018 Codethink Limited
+# Copyright (C) 2018-2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import asyncio
+import contextlib
+import os
+import shutil
+import signal
+import subprocess
+import tempfile
+import time
+
+from .. import _signals, utils
+from .._message import Message, MessageType
+
+_CASD_MAX_LOGFILES = 10
+
+
+# CASDProcessManager
+#
+# This manages the subprocess that runs buildbox-casd.
+#
+# Args:
+# path (str): The root directory for the CAS repository
+# log_dir (str): The directory for the logs
+# log_level (LogLevel): Log level to give to buildbox-casd for logging
+# cache_quota (int): User configured cache quota
+# protect_session_blobs (bool): Disable expiry for blobs used in the current session
+#
+class CASDProcessManager:
+
+ def __init__(self, path, log_dir, log_level, cache_quota, protect_session_blobs):
+ self._log_dir = log_dir
+
+ # Place socket in global/user temporary directory to avoid hitting
+ # the socket path length limit.
+ self._socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
+ self.socket_path = os.path.join(self._socket_tempdir, 'casd.sock')
+
+ casd_args = [utils.get_host_tool('buildbox-casd')]
+ casd_args.append('--bind=unix:' + self.socket_path)
+ casd_args.append('--log-level=' + log_level.value)
+
+ if cache_quota is not None:
+ casd_args.append('--quota-high={}'.format(int(cache_quota)))
+ casd_args.append('--quota-low={}'.format(int(cache_quota / 2)))
+
+ if protect_session_blobs:
+ casd_args.append('--protect-session-blobs')
+
+ casd_args.append(path)
+
+ self.start_time = time.time()
+ self._logfile = self._rotate_and_get_next_logfile()
+
+ with open(self._logfile, "w") as logfile_fp:
+ # Block SIGINT on buildbox-casd, we don't need to stop it
+ # The frontend will take care of it if needed
+ with _signals.blocked([signal.SIGINT], ignore=False):
+ self._process = subprocess.Popen(
+ casd_args, cwd=path, stdout=logfile_fp, stderr=subprocess.STDOUT)
+
+ self._failure_callback = None
+ self._watcher = None
+
+ # _rotate_and_get_next_logfile()
+ #
+ # Get the logfile to use for casd
+ #
+ # This will ensure that we don't create too many casd log files by
+ # rotating the logs and only keeping _CASD_MAX_LOGFILES logs around.
+ #
+ # Returns:
+ # (str): the path to the log file to use
+ #
+ def _rotate_and_get_next_logfile(self):
+ try:
+ existing_logs = sorted(os.listdir(self._log_dir))
+ except FileNotFoundError:
+ os.makedirs(self._log_dir)
+ else:
+ while len(existing_logs) >= _CASD_MAX_LOGFILES:
+ logfile_to_delete = existing_logs.pop(0)
+ os.remove(os.path.join(self._log_dir, logfile_to_delete))
+
+ return os.path.join(self._log_dir, str(self.start_time) + ".log")
+
+ # terminate()
+ #
+ # Terminate the buildbox casd process
+ #
+ # Args:
+ # messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
+ #
+ def terminate(self, messenger=None):
+ assert self._watcher is None
+ assert self._failure_callback is None
+
+ return_code = self._process.poll()
+
+ if return_code is not None:
+ # buildbox-casd is already dead
+ self._process = None
+
+ if messenger:
+ messenger.message(
+ Message(
+ MessageType.BUG,
+ "Buildbox-casd died during the run. Exit code: {}, Logs: {}".format(
+ return_code, self._logfile
+ ),
+ )
+ )
+ return
+
+ self._process.terminate()
+
+ try:
+ # Don't print anything if buildbox-casd terminates quickly
+ return_code = self._process.wait(timeout=0.5)
+ except subprocess.TimeoutExpired:
+ if messenger:
+ cm = messenger.timed_activity("Terminating buildbox-casd")
+ else:
+ cm = contextlib.suppress()
+ with cm:
+ try:
+ return_code = self._process.wait(timeout=15)
+ except subprocess.TimeoutExpired:
+ self._process.kill()
+ self._process.wait(timeout=15)
+
+ if messenger:
+ messenger.message(
+ Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
+ )
+ self._process = None
+ return
+
+ if return_code != 0 and messenger:
+ messenger.message(
+ Message(
+ MessageType.BUG,
+ "Buildbox-casd didn't exit cleanly. Exit code: {}, Logs: {}".format(
+ return_code, self._logfile
+ ),
+ )
+ )
+
+ # clean_up()
+ #
+ # After termination, clean up any additional resources
+ #
+ def clean_up(self):
+ shutil.rmtree(self._socket_tempdir)
+
+ # set_failure_callback()
+ #
+ # Call this function if the CASD process stops unexpectedly.
+ #
+ # Note that we guarantee that the lifetime of any 'watcher' used is bound
+ # to the lifetime of the callback - we won't hang on to the asyncio loop
+ # longer than necessary.
+ #
+ # We won't be able to use watchers on win32, so we'll need to support
+ # another approach.
+ #
+ # Args:
+ # func (callable): a callable that takes no parameters
+ #
+ def set_failure_callback(self, func):
+ assert func is not None
+ assert self._watcher is None
+ assert self._failure_callback is None
+ self._failure_callback = func
+ self._watcher = asyncio.get_child_watcher()
+ self._watcher.add_child_handler(self._process.pid, self._on_casd_failure)
+
+ # clear_failure_callback()
+ #
+ # No longer call this callable if the CASD process stops unexpectedly
+ #
+ # Args:
+ # func (callable): The callable that was provided to add_failure_callback().
+ # Supplying this again allows us to do error checking.
+ #
+ def clear_failure_callback(self, func):
+ assert func is not None
+ assert self._failure_callback == func
+ self._watcher.remove_child_handler(self._process.pid)
+ self._failure_callback = None
+ self._watcher = None
+
+ # _on_casd_failure()
+ #
+ # Handler for casd process terminating unexpectedly
+ #
+ # Args:
+ # pid (int): the process id under which buildbox-casd was running
+ # returncode (int): the return code with which buildbox-casd exited
+ #
+ def _on_casd_failure(self, pid, returncode):
+ assert self._failure_callback is not None
+ self._process.returncode = returncode
+ self._failure_callback()
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index d3faa2a..b8af01c 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -138,7 +138,6 @@ class Scheduler():
self._suspendtime = None # Session time compensation for suspended state
self._queue_jobs = True # Whether we should continue to queue jobs
self._state = state
- self._casd_process = None # handle to the casd process for monitoring purpose
# Bidirectional queue to send notifications back to the Scheduler's owner
self._notification_queue = notification_queue
@@ -152,8 +151,8 @@ class Scheduler():
#
# Args:
# queues (list): A list of Queue objects
- # casd_processes (subprocess.Process): The subprocess which runs casd in order to be notified
- # of failures.
+ # casd_process_manager (cascache.CASDProcessManager): The subprocess which runs casd, in order to be notified
+ # of failures.
#
# Returns:
# (SchedStatus): How the scheduling terminated
@@ -163,7 +162,7 @@ class Scheduler():
# elements have been processed by each queue or when
# an error arises
#
- def run(self, queues, casd_process):
+ def run(self, queues, casd_process_manager):
# Hold on to the queues to process
self.queues = queues
@@ -183,9 +182,7 @@ class Scheduler():
self._connect_signals()
# Watch casd while running to ensure it doesn't die
- self._casd_process = casd_process
- _watcher = asyncio.get_child_watcher()
- _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
+ casd_process_manager.set_failure_callback(self._abort_on_casd_failure)
# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
@@ -195,8 +192,7 @@ class Scheduler():
self.loop.close()
# Stop watching casd
- _watcher.remove_child_handler(casd_process.pid)
- self._casd_process = None
+ casd_process_manager.clear_failure_callback(self._abort_on_casd_failure)
# Stop handling unix signals
self._disconnect_signals()
@@ -337,15 +333,9 @@ class Scheduler():
# This will terminate immediately all jobs, since buildbox-casd is dead,
# we can't do anything with them anymore.
#
- # Args:
- # pid (int): the process id under which buildbox-casd was running
- # returncode (int): the return code with which buildbox-casd exited
- #
- def _abort_on_casd_failure(self, pid, returncode):
+ def _abort_on_casd_failure(self):
message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
self._notify(Notification(NotificationType.MESSAGE, message=message))
-
- self._casd_process.returncode = returncode
self.terminate_jobs()
# _start_job()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c7ada6e..04fdcfc 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1365,7 +1365,7 @@ class Stream():
if self._session_start_callback is not None:
self._session_start_callback()
- status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
+ status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process_manager())
if status == SchedStatus.ERROR:
raise StreamError()
[buildstream] 03/06: CASDProcessManager: 'release_resources'
convention
Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch aevri/casdprocessmanager2
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit bd984334f2d5590fd60d16f775a2132dabac2831
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Fri Oct 11 13:32:21 2019 +0100
CASDProcessManager: 'release_resources' convention
Elsewhere in cascache, cleaning up is called 'release_resources', so
follow that convention for consistency.
Also fix a case where self._process was not set to None in terminate().
---
src/buildstream/_cas/cascache.py | 3 +--
src/buildstream/_cas/casdprocessmanager.py | 25 +++++++++++--------------
2 files changed, 12 insertions(+), 16 deletions(-)
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 091b14e..aefc1b9 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -184,8 +184,7 @@ class CASCache():
if self._casd_process_manager:
self.close_grpc_channels()
- self._casd_process_manager.terminate(messenger)
- self._casd_process_manager.clean_up()
+ self._casd_process_manager.release_resources(messenger)
self._casd_process_manager = None
# contains():
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
index 697b21f..3a434ad 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -100,14 +100,20 @@ class CASDProcessManager:
return os.path.join(self._log_dir, str(self.start_time) + ".log")
- # terminate()
+ # release_resources()
#
- # Terminate the buildbox casd process
+ # Terminate the process and release related resources.
#
- # Args:
- # messenger (buildstream._messenger.Messenger): Messenger to forward information to the frontend
+ def release_resources(self, messenger=None):
+ self._terminate(messenger)
+ self._process = None
+ shutil.rmtree(self._socket_tempdir)
+
+ # _terminate()
+ #
+ # Terminate the buildbox casd process.
#
- def terminate(self, messenger=None):
+ def _terminate(self, messenger=None):
assert self._watcher is None
assert self._failure_callback is None
@@ -115,7 +121,6 @@ class CASDProcessManager:
if return_code is not None:
# buildbox-casd is already dead
- self._process = None
if messenger:
messenger.message(
@@ -149,7 +154,6 @@ class CASDProcessManager:
messenger.message(
Message(MessageType.WARN, "Buildbox-casd didn't exit in time and has been killed")
)
- self._process = None
return
if return_code != 0 and messenger:
@@ -162,13 +166,6 @@ class CASDProcessManager:
)
)
- # clean_up()
- #
- # After termination, clean up any additional resources
- #
- def clean_up(self):
- shutil.rmtree(self._socket_tempdir)
-
# set_failure_callback()
#
# Call this function if the CASD process stops unexpectedly.
[buildstream] 06/06: tests/artifactshare: safer cleanup_on_sigterm
use
Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch aevri/casdprocessmanager2
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 5ad7550a9129de60b5b57a60055c781611119d67
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Mon Oct 28 15:44:51 2019 +0000
tests/artifactshare: safer cleanup_on_sigterm use
Use the documented path [1] to `pytest_cov.embed.cleanup_on_sigterm()`,
to avoid crashing on some versions.
It turns out that pytest_cov v2.6.1 on my machine doesn't like the way
that we were accessing cleanup_on_sigterm(). Access it in such a way
that we will either get the function or an ImportError, as per the
documentation.
[1]: https://pytest-cov.readthedocs.io/en/latest/subprocess-support.html
---
tests/testutils/artifactshare.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index 7a4eda1..e49531d 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -162,11 +162,11 @@ def _start_artifact_server(queue, repodir, quota, index_only):
signal.signal(signal.SIGTERM, lambda signalnum, frame: sys.exit(0))
try:
- import pytest_cov
+ from pytest_cov.embed import cleanup_on_sigterm
except ImportError:
pass
else:
- pytest_cov.embed.cleanup_on_sigterm()
+ cleanup_on_sigterm()
server = stack.enter_context(
create_server(
[buildstream] 05/06: testutils/artifactshare: don't hang on error
Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch aevri/casdprocessmanager2
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 71189d46622201550b86de569aa1517adbd328c7
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Mon Oct 28 15:39:54 2019 +0000
testutils/artifactshare: don't hang on error
Remove a couple of cases where it's possible to make the main test
process hang, waiting for something to appear on a queue.
Make it clearer what is available to the server process by
splitting it out into a non-member function.
Raise a friendlier exception, earlier, if there was a problem starting
the server process.
---
tests/testutils/artifactshare.py | 84 +++++++++++++++++++++-------------------
1 file changed, 45 insertions(+), 39 deletions(-)
diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index 87b0808..7a4eda1 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -4,7 +4,7 @@ import signal
import sys
from collections import namedtuple
-from contextlib import contextmanager
+from contextlib import ExitStack, contextmanager
from multiprocessing import Process, Queue
from buildstream._cas import CASCache
@@ -50,50 +50,20 @@ class ArtifactShare():
q = Queue()
- self.process = Process(target=self.run, args=(q,))
+ self.process = Process(
+ target=_start_artifact_server,
+ args=(q, self.repodir, self.quota, self.index_only),
+ )
+
self.process.start()
# Retrieve port from server subprocess
port = q.get()
- self.repo = 'http://localhost:{}'.format(port)
-
- # run():
- #
- # Run the artifact server.
- #
- def run(self, q):
-
- # Handle SIGTERM by calling sys.exit(0), which will raise a SystemExit exception,
- # properly executing cleanup code in `finally` clauses and context managers.
- # This is required to terminate buildbox-casd on SIGTERM.
- signal.signal(signal.SIGTERM, lambda signalnum, frame: sys.exit(0))
-
- try:
- import pytest_cov
- except ImportError:
- pass
- else:
- pytest_cov.embed.cleanup_on_sigterm()
-
- try:
- with create_server(self.repodir,
- quota=self.quota,
- enable_push=True,
- index_only=self.index_only) as server:
- port = server.add_insecure_port('localhost:0')
-
- server.start()
-
- # Send port to parent
- q.put(port)
+ if port is None:
+ raise Exception("Error occurred when starting artifact server.")
- # Sleep until termination by signal
- signal.pause()
-
- except Exception:
- q.put(None)
- raise
+ self.repo = 'http://localhost:{}'.format(port)
# has_object():
#
@@ -183,6 +153,42 @@ class ArtifactShare():
shutil.rmtree(self.directory)
+def _start_artifact_server(queue, repodir, quota, index_only):
+ with ExitStack() as stack:
+ try:
+ # Handle SIGTERM by calling sys.exit(0), which will raise a SystemExit exception,
+ # properly executing cleanup code in `finally` clauses and context managers.
+ # This is required to terminate buildbox-casd on SIGTERM.
+ signal.signal(signal.SIGTERM, lambda signalnum, frame: sys.exit(0))
+
+ try:
+ import pytest_cov
+ except ImportError:
+ pass
+ else:
+ pytest_cov.embed.cleanup_on_sigterm()
+
+ server = stack.enter_context(
+ create_server(
+ repodir,
+ quota=quota,
+ enable_push=True,
+ index_only=index_only,
+ )
+ )
+ port = server.add_insecure_port('localhost:0')
+ server.start()
+ except Exception:
+ queue.put(None)
+ raise
+
+ # Send port to parent
+ queue.put(port)
+
+ # Sleep until termination by signal
+ signal.pause()
+
+
# create_artifact_share()
#
# Create an ArtifactShare for use in a test case
[buildstream] 04/06: Extract casd_channel logic to CASDConnection
Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch aevri/casdprocessmanager2
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 84aab60bd8066439ed4971a23b21288d30729a87
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Mon Oct 14 13:53:00 2019 +0100
Extract casd_channel logic to CASDConnection
Encapsulate the management of a connection to CASD, so we can hide the
details of how it happens. This will make it easier to port to Windows,
as we will have to take a different approach there.
Also make get_local_cas() public, since it is already used outside of
the CASCache class.
---
src/buildstream/_cas/cascache.py | 98 ++++++++++++++----------------
src/buildstream/_cas/casdprocessmanager.py | 83 ++++++++++++++++++++++++-
src/buildstream/_cas/casremote.py | 6 +-
3 files changed, 130 insertions(+), 57 deletions(-)
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index aefc1b9..65359ff 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -31,14 +31,14 @@ import time
import grpc
from .._protos.google.rpc import code_pb2
-from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
-from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc
+from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
+from .._protos.build.buildgrid import local_cas_pb2
from .. import _signals, utils
from ..types import FastEnum
from .._exceptions import CASCacheError
-from .casdprocessmanager import CASDProcessManager
+from .casdprocessmanager import CASDConnection, CASDProcessManager
from .casremote import _CASBatchRead, _CASBatchUpdate
_BUFFER_SIZE = 65536
@@ -74,9 +74,6 @@ class CASCache():
os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
os.makedirs(self.tmpdir, exist_ok=True)
- self._casd_channel = None
- self._casd_cas = None
- self._local_cas = None
self._cache_usage_monitor = None
self._cache_usage_monitor_forbidden = False
@@ -107,43 +104,12 @@ class CASCache():
return state
- def _init_casd(self):
- assert self._casd_process_manager, "CASCache was instantiated without buildbox-casd"
-
- if not self._casd_channel:
- while not os.path.exists(self._casd_process_manager.socket_path):
- # casd is not ready yet, try again after a 10ms delay,
- # but don't wait for more than 15s
- if time.time() > self._casd_process_manager.start_time + 15:
- raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
-
- time.sleep(0.01)
-
- self._casd_channel = grpc.insecure_channel('unix:' + self._casd_process_manager.socket_path)
- self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
- self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
-
- # Call GetCapabilities() to establish connection to casd
- capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self._casd_channel)
- capabilities.GetCapabilities(remote_execution_pb2.GetCapabilitiesRequest())
-
- # _get_cas():
- #
- # Return ContentAddressableStorage stub for buildbox-casd channel.
- #
- def _get_cas(self):
- if not self._casd_cas:
- self._init_casd()
- return self._casd_cas
-
- # _get_local_cas():
+ # get_local_cas():
#
# Return LocalCAS stub for buildbox-casd channel.
#
- def _get_local_cas(self):
- if not self._local_cas:
- self._init_casd()
- return self._local_cas
+ def get_local_cas(self):
+ return self._casd_process_manager.get_connection().get_local_cas()
# preflight():
#
@@ -161,18 +127,17 @@ class CASCache():
# against fork() with open gRPC channels.
#
def has_open_grpc_channels(self):
- return bool(self._casd_channel)
+ if self._casd_process_manager:
+ return self._casd_process_manager.has_open_grpc_channels()
+ return False
# close_grpc_channels():
#
# Close the casd channel if it exists
#
def close_grpc_channels(self):
- if self._casd_channel:
- self._local_cas = None
- self._casd_cas = None
- self._casd_channel.close()
- self._casd_channel = None
+ if self._casd_process_manager:
+ self._casd_process_manager.close_grpc_channels()
# release_resources():
#
@@ -390,8 +355,7 @@ class CASCache():
request.path.append(path)
- local_cas = self._get_local_cas()
-
+ local_cas = self.get_local_cas()
response = local_cas.CaptureFiles(request)
if len(response.responses) != 1:
@@ -417,7 +381,7 @@ class CASCache():
# (Digest): The digest of the imported directory
#
def import_directory(self, path):
- local_cas = self._get_local_cas()
+ local_cas = self.get_local_cas()
request = local_cas_pb2.CaptureTreeRequest()
request.path.append(path)
@@ -537,7 +501,7 @@ class CASCache():
# Returns: List of missing Digest objects
#
def remote_missing_blobs(self, remote, blobs):
- cas = self._get_cas()
+ cas = self._casd_process_manager.get_connection().get_cas()
instance_name = remote.local_cas_instance_name
missing_blobs = dict()
@@ -1032,7 +996,7 @@ class _CASCacheUsageMonitor:
disk_usage = self._disk_usage
disk_quota = self._disk_quota
- local_cas = self.cas._get_local_cas()
+ local_cas = self.cas.get_local_cas()
while True:
try:
@@ -1071,5 +1035,33 @@ def _grouper(iterable, n):
#
class _LimitedCASDProcessManagerProxy:
def __init__(self, casd_process_manager):
- self.socket_path = casd_process_manager.socket_path
- self.start_time = casd_process_manager.start_time
+ self._casd_connection = None
+ self._connection_string = casd_process_manager.connection_string
+ self._start_time = casd_process_manager.start_time
+ self._socket_path = casd_process_manager.socket_path
+
+ # get_connection():
+ #
+ # Return ContentAddressableStorage stub for buildbox-casd channel.
+ #
+ def get_connection(self):
+ if not self._casd_connection:
+ self._casd_connection = CASDConnection(
+ self._socket_path, self._connection_string, self._start_time)
+ return self._casd_connection
+
+ # has_open_grpc_channels():
+ #
+ # Return whether there are gRPC channel instances. This is used to safeguard
+ # against fork() with open gRPC channels.
+ #
+ def has_open_grpc_channels(self):
+ return bool(self._casd_connection)
+
+ # close_grpc_channels():
+ #
+ # Close the casd channel if it exists
+ #
+ def close_grpc_channels(self):
+ if self._casd_connection:
+ self._casd_connection.close()
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
index 3a434ad..c096db1 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -25,7 +25,13 @@ import subprocess
import tempfile
import time
+import grpc
+
+from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
+from .._protos.build.buildgrid import local_cas_pb2_grpc
+
from .. import _signals, utils
+from .._exceptions import CASCacheError
from .._message import Message, MessageType
_CASD_MAX_LOGFILES = 10
@@ -47,13 +53,16 @@ class CASDProcessManager:
def __init__(self, path, log_dir, log_level, cache_quota, protect_session_blobs):
self._log_dir = log_dir
+ self._casd_connection = None
+
# Place socket in global/user temporary directory to avoid hitting
# the socket path length limit.
self._socket_tempdir = tempfile.mkdtemp(prefix='buildstream')
self.socket_path = os.path.join(self._socket_tempdir, 'casd.sock')
+ self.connection_string = "unix:" + self.socket_path
casd_args = [utils.get_host_tool('buildbox-casd')]
- casd_args.append('--bind=unix:' + self.socket_path)
+ casd_args.append('--bind=' + self.connection_string)
casd_args.append('--log-level=' + log_level.value)
if cache_quota is not None:
@@ -215,3 +224,75 @@ class CASDProcessManager:
assert self._failure_callback is not None
self._process.returncode = returncode
self._failure_callback()
+
+ # get_connection():
+ #
+ # Return ContentAddressableStorage stub for buildbox-casd channel.
+ #
+ def get_connection(self):
+ if not self._casd_connection:
+ self._casd_connection = CASDConnection(
+ self.socket_path, self.connection_string, self.start_time)
+ return self._casd_connection
+
+ # has_open_grpc_channels():
+ #
+ # Return whether there are gRPC channel instances. This is used to safeguard
+ # against fork() with open gRPC channels.
+ #
+ def has_open_grpc_channels(self):
+ return bool(self._casd_connection)
+
+ # close_grpc_channels():
+ #
+ # Close the casd channel if it exists
+ #
+ def close_grpc_channels(self):
+ if self._casd_connection:
+ self._casd_connection.close()
+
+
+class CASDConnection:
+ def __init__(self, socket_path, connection_string, start_time):
+ while not os.path.exists(socket_path):
+ # casd is not ready yet, try again after a 10ms delay,
+ # but don't wait for more than 15s
+ if time.time() > start_time + 15:
+ raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
+
+ time.sleep(0.01)
+
+ self._casd_channel = grpc.insecure_channel(connection_string)
+ self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
+ self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
+
+ # Call GetCapabilities() to establish connection to casd
+ capabilities = remote_execution_pb2_grpc.CapabilitiesStub(self._casd_channel)
+ capabilities.GetCapabilities(remote_execution_pb2.GetCapabilitiesRequest())
+
+ # get_cas():
+ #
+ # Return ContentAddressableStorage stub for buildbox-casd channel.
+ #
+ def get_cas(self):
+ assert self._casd_channel is not None
+ return self._casd_cas
+
+ # get_local_cas():
+ #
+ # Return LocalCAS stub for buildbox-casd channel.
+ #
+ def get_local_cas(self):
+ assert self._casd_channel is not None
+ return self._local_cas
+
+ # close():
+ #
+ # Close the casd channel.
+ #
+ def close(self):
+ assert self._casd_channel is not None
+ self._local_cas = None
+ self._casd_cas = None
+ self._casd_channel.close()
+ self._casd_channel = None
diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py
index a054b28..c89ea9f 100644
--- a/src/buildstream/_cas/casremote.py
+++ b/src/buildstream/_cas/casremote.py
@@ -55,7 +55,7 @@ class CASRemote(BaseRemote):
# be called outside of init().
#
def _configure_protocols(self):
- local_cas = self.cascache._get_local_cas()
+ local_cas = self.cascache.get_local_cas()
request = local_cas_pb2.GetInstanceNameForRemoteRequest()
request.url = self.spec.url
if self.spec.instance_name:
@@ -115,7 +115,7 @@ class _CASBatchRead():
if not self._requests:
return
- local_cas = self._remote.cascache._get_local_cas()
+ local_cas = self._remote.cascache.get_local_cas()
for request in self._requests:
batch_response = local_cas.FetchMissingBlobs(request)
@@ -163,7 +163,7 @@ class _CASBatchUpdate():
if not self._requests:
return
- local_cas = self._remote.cascache._get_local_cas()
+ local_cas = self._remote.cascache.get_local_cas()
for request in self._requests:
batch_response = local_cas.UploadMissingBlobs(request)