You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:46:14 UTC
[buildstream] 02/19: casserver.py: Run buildbox-casd in
`bst-artifact-server`
This is an automated email from the ASF dual-hosted git repository.
root pushed a commit to branch tlater/casd-socket-permissions
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 7c6d27b5dcfd4434a759febeae9195995f44a304
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Tue Oct 15 11:38:29 2019 +0100
casserver.py: Run buildbox-casd in `bst-artifact-server`
This is in preparation of a switch to directly calling buildbox-casd
through the `bst-artifact-server`, rather than relying on BuildStream
internals to do this.
This should help untangle the codebase a little, since our CAS
interfaces will all end up with a single goal, rather than trying to
do both server-end and client-end things.
---
src/buildstream/_cas/casserver.py | 121 +++++++++++++++++++++++++++++++++++++-
1 file changed, 120 insertions(+), 1 deletion(-)
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index d5a29a3..c0c62b0 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -18,13 +18,15 @@
# Jürg Billeter <ju...@codethink.co.uk>
from concurrent import futures
-from contextlib import contextmanager
from enum import Enum
+import contextlib
import logging
import os
import signal
+import subprocess
import sys
import tempfile
+import time
import uuid
import errno
@@ -78,6 +80,120 @@ class ClickLogLevel(click.Choice):
return LogLevel(super().convert(value, param, ctx))
+# CASdRunner():
+#
+# Manage a buildbox-casd process.
+#
+# FIXME: Probably better to replace this with the work from !1638
+#
+class CASdRunner:
+ def __init__(self, path: str, *, cache_quota: int = None, log_level: LogLevel = LogLevel.WARNING):
+ self.root = path
+ self.casdir = os.path.join(path, "cas")
+ self.tmpdir = os.path.join(path, "tmp")
+
+ self._casd_process = None
+ self._casd_socket_path = None
+ self._casd_socket_tempdir = None
+ self._log_level = log_level
+ self._quota = cache_quota
+
+ # start_casd():
+ #
+ # Start the CASd process.
+ #
+ def start_casd(self):
+ assert not self._casd_process, "CASd was already started"
+
+ os.makedirs(os.path.join(self.casdir, "refs", "heads"), exist_ok=True)
+ os.makedirs(os.path.join(self.casdir, "objects"), exist_ok=True)
+ os.makedirs(self.tmpdir, exist_ok=True)
+
+ # Place socket in global/user temporary directory to avoid hitting
+ # the socket path length limit.
+ self._casd_socket_tempdir = tempfile.mkdtemp(prefix="buildstream")
+ self._casd_socket_path = os.path.join(self._casd_socket_tempdir, "casd.sock")
+
+ casd_args = [utils.get_host_tool("buildbox-casd")]
+ casd_args.append("--bind=unix:" + self._casd_socket_path)
+ casd_args.append("--log-level=" + self._log_level.value)
+
+ if self._quota is not None:
+ casd_args.append("--quota-high={}".format(int(self._quota)))
+ casd_args.append("--quota-low={}".format(int(self._quota / 2)))
+
+ casd_args.append(self.root)
+
+ blocked_signals = signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT])
+
+ try:
+ self._casd_process = subprocess.Popen(
+ casd_args,
+ cwd=self.root,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ )
+ finally:
+ signal.pthread_sigmask(signal.SIG_SETMASK, blocked_signals)
+
+ # stop():
+ #
+ # Stop and tear down the CASd process.
+ #
+ def stop(self):
+ return_code = self._casd_process.poll()
+
+ if return_code is not None:
+ self._casd_process = None
+ logging.error(
+ "Buildbox-casd died during the run. Exit code: %s", return_code
+ )
+ logging.error(self._casd_process.stdout.read().decode())
+ return
+
+ self._casd_process.terminate()
+
+ try:
+ return_code = self._casd_process.wait(timeout=0.5)
+ except subprocess.TimeoutExpired:
+ with contextlib.suppress():
+ try:
+ return_code = self._casd_process.wait(timeout=15)
+ except subprocess.TimeoutExpired:
+ self._casd_process.kill()
+ self._casd_process.wait(timeout=15)
+ logging.warning(
+ "Buildbox-casd didn't exit in time and has been killed"
+ )
+ logging.error(self._casd_process.stdout.read().decode())
+ self._casd_process = None
+ return
+
+ if return_code != 0:
+ logging.error(
+ "Buildbox-casd didn't exit cleanly. Exit code: %d", return_code
+ )
+ logging.error(self._casd_process.stdout.read().decode())
+
+ self._casd_process = None
+
+ # get_socket_path():
+ #
+ # Get the path to the socket of the CASd process - None if the
+ # process has not been started yet.
+ #
+ def get_socket_path(self) -> str:
+ assert self._casd_socket_path is not None, "CASd has not been started"
+ return self._casd_socket_path
+
+ # get_casdir():
+ #
+ # Get the path to the directory managed by CASd.
+ #
+ def get_casdir(self) -> str:
+ return self.casdir
+
+
# create_server():
#
# Create gRPC CAS artifact server as specified in the Remote Execution API.
@@ -96,6 +212,8 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.WA
logger.addHandler(handler)
cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False)
+ cas_runner = CASdRunner(os.path.abspath(repo), cache_quota=quota)
+ cas_runner.start_casd()
try:
artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs')
@@ -137,6 +255,7 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.WA
finally:
cas.release_resources()
+ cas_runner.stop()
@click.command(short_help="CAS Artifact Server")