You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:46:13 UTC

[buildstream] 01/19: casserver.py: Add logging

This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch tlater/casd-socket-permissions
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit abaa1d1ec6cf858de0eac48271b0bff3d5b71139
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Fri Oct 11 15:19:19 2019 +0100

    casserver.py: Add logging
---
 src/buildstream/_cas/casserver.py | 76 ++++++++++++++++++++++++++++++++++++---
 1 file changed, 71 insertions(+), 5 deletions(-)

diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index d424143..d5a29a3 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -19,6 +19,8 @@
 
 from concurrent import futures
 from contextlib import contextmanager
+from enum import Enum
+import logging
 import os
 import signal
 import sys
@@ -41,12 +43,41 @@ from .._exceptions import CASError, CASCacheError
 
 from .cascache import CASCache
 
-
 # The default limit for gRPC messages is 4 MiB.
 # Limit payload to 1 MiB to leave sufficient headroom for metadata.
 _MAX_PAYLOAD_BYTES = 1024 * 1024
 
 
+# LogLevel():
+#
+# Represents the buildbox-casd log level.
+#
+class LogLevel(Enum):
+    WARNING = "warning"
+    INFO = "info"
+    TRACE = "trace"
+
+    @classmethod
+    def get_logging_equivalent(cls, level: 'LogLevel') -> int:
+        equivalents = {
+            cls.WARNING: logging.WARNING,
+            cls.INFO: logging.INFO,
+            cls.TRACE: logging.DEBUG
+        }
+
+        # Yes, logging.WARNING/INFO/DEBUG are ints
+        # I also don't know why
+        return equivalents[level]
+
+
+class ClickLogLevel(click.Choice):
+    def __init__(self):
+        super().__init__([m.lower() for m in LogLevel._member_names_])  # pylint: disable=no-member
+
+    def convert(self, value, param, ctx):
+        return LogLevel(super().convert(value, param, ctx))
+
+
 # create_server():
 #
 # Create gRPC CAS artifact server as specified in the Remote Execution API.
@@ -56,8 +87,14 @@ _MAX_PAYLOAD_BYTES = 1024 * 1024
 #     enable_push (bool): Whether to allow blob uploads and artifact updates
 #     index_only (bool): Whether to store CAS blobs or only artifacts
 #
-@contextmanager
-def create_server(repo, *, enable_push, quota, index_only):
+@contextlib.contextmanager
+def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.WARNING):
+    logger = logging.getLogger('casserver')
+    logger.setLevel(LogLevel.get_logging_equivalent(log_level))
+    handler = logging.StreamHandler(sys.stderr)
+    handler.setLevel(LogLevel.get_logging_equivalent(log_level))
+    logger.addHandler(handler)
+
     cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False)
 
     try:
@@ -113,9 +150,11 @@ def create_server(repo, *, enable_push, quota, index_only):
               help="Maximum disk usage in bytes")
 @click.option('--index-only', is_flag=True,
               help="Only provide the BuildStream artifact and source services (\"index\"), not the CAS (\"storage\")")
+@click.option('--log-level', type=ClickLogLevel(),
+              help="The log level to launch with")
 @click.argument('repo')
 def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
-                quota, index_only):
+                quota, index_only, log_level):
     # Handle SIGTERM by calling sys.exit(0), which will raise a SystemExit exception,
     # properly executing cleanup code in `finally` clauses and context managers.
     # This is required to terminate buildbox-casd on SIGTERM.
@@ -124,7 +163,8 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
     with create_server(repo,
                        quota=quota,
                        enable_push=enable_push,
-                       index_only=index_only) as server:
+                       index_only=index_only,
+                       log_level=log_level) as server:
 
         use_tls = bool(server_key)
 
@@ -170,8 +210,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
         super().__init__()
         self.cas = cas
         self.enable_push = enable_push
+        self.logger = logging.getLogger("casserver")
 
     def Read(self, request, context):
+        self.logger.info("Read")
         resource_name = request.resource_name
         client_digest = _digest_from_download_resource_name(resource_name)
         if client_digest is None:
@@ -206,6 +248,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
             context.set_code(grpc.StatusCode.NOT_FOUND)
 
     def Write(self, request_iterator, context):
+        self.logger.info("Write")
         response = bytestream_pb2.WriteResponse()
 
         if not self.enable_push:
@@ -285,8 +328,10 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
         super().__init__()
         self.cas = cas
         self.enable_push = enable_push
+        self.logger = logging.getLogger("casserver")
 
     def FindMissingBlobs(self, request, context):
+        self.logger.info("FindMissingBlobs")
         response = remote_execution_pb2.FindMissingBlobsResponse()
         for digest in request.blob_digests:
             objpath = self.cas.objpath(digest)
@@ -303,6 +348,8 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
         return response
 
     def BatchReadBlobs(self, request, context):
+        self.logger.info("BatchReadBlobs")
+        self.logger.debug(request.digests)
         response = remote_execution_pb2.BatchReadBlobsResponse()
         batch_size = 0
 
@@ -331,6 +378,8 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
         return response
 
     def BatchUpdateBlobs(self, request, context):
+        self.logger.info("BatchUpdateBlobs")
+        self.logger.debug([request.digest for request in request.requests])
         response = remote_execution_pb2.BatchUpdateBlobsResponse()
 
         if not self.enable_push:
@@ -375,7 +424,11 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
 
 
 class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
+    def __init__(self):
+        self.logger = logging.getLogger("casserver")
+
     def GetCapabilities(self, request, context):
+        self.logger.info("GetCapabilities")
         response = remote_execution_pb2.ServerCapabilities()
 
         cache_capabilities = response.cache_capabilities
@@ -396,8 +449,10 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
         super().__init__()
         self.cas = cas
         self.enable_push = enable_push
+        self.logger = logging.getLogger("casserver")
 
     def GetReference(self, request, context):
+        self.logger.debug("GetReference")
         response = buildstream_pb2.GetReferenceResponse()
 
         try:
@@ -417,6 +472,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
         return response
 
     def UpdateReference(self, request, context):
+        self.logger.debug("UpdateReference")
         response = buildstream_pb2.UpdateReferenceResponse()
 
         if not self.enable_push:
@@ -429,6 +485,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
         return response
 
     def Status(self, request, context):
+        self.logger.debug("Status")
         response = buildstream_pb2.StatusResponse()
 
         response.allow_updates = self.enable_push
@@ -444,8 +501,11 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
         self.artifactdir = artifactdir
         self.update_cas = update_cas
         os.makedirs(artifactdir, exist_ok=True)
+        self.logger = logging.getLogger("casserver")
 
     def GetArtifact(self, request, context):
+        self.logger.info("GetArtifact")
+        self.logger.debug(request.cache_key)
         artifact_path = os.path.join(self.artifactdir, request.cache_key)
         if not os.path.exists(artifact_path):
             context.abort(grpc.StatusCode.NOT_FOUND, "Artifact proto not found")
@@ -498,6 +558,8 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
         return artifact
 
     def UpdateArtifact(self, request, context):
+        self.logger.info("UpdateArtifact")
+        self.logger.debug(request.cache_key)
         artifact = request.artifact
 
         if self.update_cas:
@@ -522,6 +584,7 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
         return artifact
 
     def ArtifactStatus(self, request, context):
+        self.logger.info("ArtifactStatus")
         return artifact_pb2.ArtifactStatusResponse()
 
     def _check_directory(self, name, digest, context):
@@ -530,9 +593,11 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
             with open(self.cas.objpath(digest), 'rb') as f:
                 directory.ParseFromString(f.read())
         except FileNotFoundError:
+            self.logger.warning("Artifact %s specified but no files found (%s)", name, self.cas.objpath(digest))
             context.abort(grpc.StatusCode.FAILED_PRECONDITION,
                           "Artifact {} specified but no files found".format(name))
         except DecodeError:
+            self.logger.warning("Artifact %s specified but directory not found (%s)", name, self.cas.objpath(digest))
             context.abort(grpc.StatusCode.FAILED_PRECONDITION,
                           "Artifact {} specified but directory not found".format(name))
 
@@ -557,6 +622,7 @@ class _BuildStreamCapabilitiesServicer(buildstream_pb2_grpc.CapabilitiesServicer
 class _SourceServicer(source_pb2_grpc.SourceServiceServicer):
     def __init__(self, sourcedir):
         self.sourcedir = sourcedir
+        self.logger = logging.getLogger("casserver")
 
     def GetSource(self, request, context):
         try: