You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:46:14 UTC

[buildstream] 02/19: casserver.py: Run buildbox-casd in `bst-artifact-server`

This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch tlater/casd-socket-permissions
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 7c6d27b5dcfd4434a759febeae9195995f44a304
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Tue Oct 15 11:38:29 2019 +0100

    casserver.py: Run buildbox-casd in `bst-artifact-server`
    
    This is in preparation of a switch to directly calling buildbox-casd
    through the `bst-artifact-server`, rather than relying on BuildStream
    internals to do this.
    
    This should help untangle the codebase a little, since our CAS
    interfaces will all end up with a single goal, rather than trying to
    do both server-end and client-end things.
---
 src/buildstream/_cas/casserver.py | 121 +++++++++++++++++++++++++++++++++++++-
 1 file changed, 120 insertions(+), 1 deletion(-)

diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index d5a29a3..c0c62b0 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -18,13 +18,15 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 
 from concurrent import futures
-from contextlib import contextmanager
 from enum import Enum
+import contextlib
 import logging
 import os
 import signal
+import subprocess
 import sys
 import tempfile
+import time
 import uuid
 import errno
 
@@ -78,6 +80,120 @@ class ClickLogLevel(click.Choice):
         return LogLevel(super().convert(value, param, ctx))
 
 
+# CASdRunner():
+#
+# Manage a buildbox-casd process.
+#
+# FIXME: Probably better to replace this with the work from !1638
+#
+class CASdRunner:
+    def __init__(self, path: str, *, cache_quota: int = None, log_level: LogLevel = LogLevel.WARNING):
+        self.root = path
+        self.casdir = os.path.join(path, "cas")
+        self.tmpdir = os.path.join(path, "tmp")
+
+        self._casd_process = None
+        self._casd_socket_path = None
+        self._casd_socket_tempdir = None
+        self._log_level = log_level
+        self._quota = cache_quota
+
+    # start_casd():
+    #
+    # Start the CASd process.
+    #
+    def start_casd(self):
+        assert not self._casd_process, "CASd was already started"
+
+        os.makedirs(os.path.join(self.casdir, "refs", "heads"), exist_ok=True)
+        os.makedirs(os.path.join(self.casdir, "objects"), exist_ok=True)
+        os.makedirs(self.tmpdir, exist_ok=True)
+
+        # Place socket in global/user temporary directory to avoid hitting
+        # the socket path length limit.
+        self._casd_socket_tempdir = tempfile.mkdtemp(prefix="buildstream")
+        self._casd_socket_path = os.path.join(self._casd_socket_tempdir, "casd.sock")
+
+        casd_args = [utils.get_host_tool("buildbox-casd")]
+        casd_args.append("--bind=unix:" + self._casd_socket_path)
+        casd_args.append("--log-level=" + self._log_level.value)
+
+        if self._quota is not None:
+            casd_args.append("--quota-high={}".format(int(self._quota)))
+            casd_args.append("--quota-low={}".format(int(self._quota / 2)))
+
+        casd_args.append(self.root)
+
+        blocked_signals = signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT])
+
+        try:
+            self._casd_process = subprocess.Popen(
+                casd_args,
+                cwd=self.root,
+                stdout=subprocess.PIPE,
+                stderr=subprocess.STDOUT,
+            )
+        finally:
+            signal.pthread_sigmask(signal.SIG_SETMASK, blocked_signals)
+
+    # stop():
+    #
+    # Stop and tear down the CASd process.
+    #
+    def stop(self):
+        return_code = self._casd_process.poll()
+
+        if return_code is not None:
+            self._casd_process = None
+            logging.error(
+                "Buildbox-casd died during the run. Exit code: %s", return_code
+            )
+            logging.error(self._casd_process.stdout.read().decode())
+            return
+
+        self._casd_process.terminate()
+
+        try:
+            return_code = self._casd_process.wait(timeout=0.5)
+        except subprocess.TimeoutExpired:
+            with contextlib.suppress():
+                try:
+                    return_code = self._casd_process.wait(timeout=15)
+                except subprocess.TimeoutExpired:
+                    self._casd_process.kill()
+                    self._casd_process.wait(timeout=15)
+                    logging.warning(
+                        "Buildbox-casd didn't exit in time and has been killed"
+                    )
+                    logging.error(self._casd_process.stdout.read().decode())
+                    self._casd_process = None
+                    return
+
+        if return_code != 0:
+            logging.error(
+                "Buildbox-casd didn't exit cleanly. Exit code: %d", return_code
+            )
+            logging.error(self._casd_process.stdout.read().decode())
+
+        self._casd_process = None
+
+    # get_socket_path():
+    #
+    # Get the path to the socket of the CASd process - None if the
+    # process has not been started yet.
+    #
+    def get_socket_path(self) -> str:
+        assert self._casd_socket_path is not None, "CASd has not been started"
+        return self._casd_socket_path
+
+    # get_casdir():
+    #
+    # Get the path to the directory managed by CASd.
+    #
+    def get_casdir(self) -> str:
+        return self.casdir
+
+
 # create_server():
 #
 # Create gRPC CAS artifact server as specified in the Remote Execution API.
@@ -96,6 +212,8 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.WA
     logger.addHandler(handler)
 
     cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False)
+    cas_runner = CASdRunner(os.path.abspath(repo), cache_quota=quota)
+    cas_runner.start_casd()
 
     try:
         artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs')
@@ -137,6 +255,7 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.WA
 
     finally:
         cas.release_resources()
+        cas_runner.stop()
 
 
 @click.command(short_help="CAS Artifact Server")