You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:46:19 UTC

[buildstream] 07/19: casserver.py: Use FetchTree instead of directly updating mtimes

This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch tlater/casd-socket-permissions
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 502ea5c578401794a93b8898074bbb1747c6e0ae
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Tue Oct 29 17:01:53 2019 +0000

    casserver.py: Use FetchTree instead of directly updating mtimes
---
 src/buildstream/_cas/casserver.py | 92 +++++++++++++++++++++++++++------------
 1 file changed, 63 insertions(+), 29 deletions(-)

diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index e0063d1..5af610f 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -35,11 +35,20 @@ import grpc
 from google.protobuf.message import DecodeError
 import click
 
-from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
+from .._protos.build.bazel.remote.execution.v2 import (
+    remote_execution_pb2,
+    remote_execution_pb2_grpc,
+)
 from .._protos.google.bytestream import bytestream_pb2_grpc
-from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
-    artifact_pb2, artifact_pb2_grpc, source_pb2, source_pb2_grpc
-
+from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc
+from .._protos.buildstream.v2 import (
+    buildstream_pb2,
+    buildstream_pb2_grpc,
+    artifact_pb2,
+    artifact_pb2_grpc,
+    source_pb2,
+    source_pb2_grpc,
+)
 from ..utils import save_file_atomic, get_host_tool
 
 
@@ -57,6 +66,7 @@ class CASRemote:
     def __init__(self, url: str):
         self._url = url
 
+        self._local_cas = None
         self._bytestream = None
         self._cas = None
 
@@ -94,6 +104,7 @@ class CASRemote:
                 raise
 
         # Set up the RPC stubs
+        self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._channel)
         self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._channel)
         self._cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._channel)
 
@@ -102,6 +113,11 @@ class CASRemote:
         assert self._cas is not None, "CAS stub was not initialized"
         return self._cas
 
+    def get_local_cas(self) -> local_cas_pb2_grpc.LocalContentAddressableStorageStub:
+        self._initialize_remote()
+        assert self._local_cas is not None, "Local CAS stub was not initialized"
+        return self._local_cas
+
     def get_bytestream(self) -> bytestream_pb2_grpc.ByteStreamStub:
         self._initialize_remote()
         assert self._bytestream is not None, "Bytestream stub was not initialized"
@@ -618,19 +634,20 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
         self.logger.debug("GetReference")
         response = buildstream_pb2.GetReferenceResponse()
 
+        request = remote_execution_pb2.FindMissingBlobsRequest()
+        d = request.blob_digests.add()
+        d.CopyFrom(request.key)
+
         try:
-            tree = self.cas_cache.resolve_ref(request.key)
-            try:
-                self.cas_cache.update_tree_mtime(tree)
-            except FileNotFoundError:
+            ref = self.cas.FindMissingBlobs(request)
+        except grpc.RpcError as err:
+            context.set_code(err.code())
+            if err.code() == grpc.StatusCode.NOT_FOUND:
                 self.cas_cache.remove_ref(request.key)
-                context.set_code(grpc.StatusCode.NOT_FOUND)
-                return response
+            return response
 
-            response.digest.hash = tree.hash
-            response.digest.size_bytes = tree.size_bytes
-        except FileNotFoundError:
-            context.set_code(grpc.StatusCode.NOT_FOUND)
+        response.digest.hash = ref.hash
+        response.digest.size_bytes = ref.size_bytes
 
         return response
 
@@ -661,6 +678,7 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
     def __init__(self, remote, root, cas_cache, *, update_cas=True):
         super().__init__()
         self.cas = remote.get_cas()
+        self.local_cas = remote.get_local_cas()
         self.cas_cache = cas_cache
         self.artifactdir = os.path.join(root, 'artifacts', 'refs')
         self.update_cas = update_cas
@@ -677,6 +695,8 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
         with open(artifact_path, 'rb') as f:
             artifact.ParseFromString(f.read())
 
+        os.utime(artifact_path)
+
         # Artifact-only servers will not have blobs on their system,
         # so we can't reasonably update their mtimes. Instead, we exit
         # early, and let the CAS server deal with its blobs.
@@ -695,28 +715,42 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
         try:
 
             if str(artifact.files):
-                self.cas_cache.update_tree_mtime(artifact.files)
+                request = local_cas_pb2.FetchTreeRequest()
+                request.root_digest.CopyFrom(artifact.files)
+                request.fetch_file_blobs = True
+                self.local_cas.FetchTree(request)
 
             if str(artifact.buildtree):
-                # buildtrees might not be there
                 try:
-                    self.cas_cache.update_tree_mtime(artifact.buildtree)
-                except FileNotFoundError:
-                    pass
+                    request = local_cas_pb2.FetchTreeRequest()
+                    request.root_digest.CopyFrom(artifact.buildtree)
+                    request.fetch_file_blobs = True
+                    self.local_cas.FetchTree(request)
+                except grpc.RpcError as err:
+                    # buildtrees might not be there
+                    if err.code() != grpc.StatusCode.NOT_FOUND:
+                        raise
 
             if str(artifact.public_data):
-                os.utime(self.cas_cache.object_path(artifact.public_data))
+                request = remote_execution_pb2.FindMissingBlobsRequest()
+                d = request.blob_digests.add()
+                d.CopyFrom(artifact.public_data)
+                self.cas.FindMissingBlobs(request)
 
+            request = remote_execution_pb2.FindMissingBlobsRequest()
             for log_file in artifact.logs:
-                os.utime(self.cas_cache.object_path(log_file.digest))
-
-        except FileNotFoundError:
-            os.unlink(artifact_path)
-            context.abort(grpc.StatusCode.NOT_FOUND,
-                          "Artifact files incomplete")
-        except DecodeError:
-            context.abort(grpc.StatusCode.NOT_FOUND,
-                          "Artifact files not valid")
+                d = request.blob_digests.add()
+                d.CopyFrom(log_file.digest)
+            self.cas.FindMissingBlobs(request)
+
+        except grpc.RpcError as err:
+            if err.code() == grpc.StatusCode.NOT_FOUND:
+                os.unlink(artifact_path)
+                context.abort(grpc.StatusCode.NOT_FOUND,
+                              "Artifact files incomplete")
+            else:
+                context.abort(grpc.StatusCode.NOT_FOUND,
+                              "Artifact files not valid")
 
         return artifact