You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:46:19 UTC
[buildstream] 07/19: casserver.py: Use FetchTree instead of
directly updating mtimes
This is an automated email from the ASF dual-hosted git repository.
root pushed a commit to branch tlater/casd-socket-permissions
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 502ea5c578401794a93b8898074bbb1747c6e0ae
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Tue Oct 29 17:01:53 2019 +0000
casserver.py: Use FetchTree instead of directly updating mtimes
---
src/buildstream/_cas/casserver.py | 92 +++++++++++++++++++++++++++------------
1 file changed, 63 insertions(+), 29 deletions(-)
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index e0063d1..5af610f 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -35,11 +35,20 @@ import grpc
from google.protobuf.message import DecodeError
import click
-from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
+from .._protos.build.bazel.remote.execution.v2 import (
+ remote_execution_pb2,
+ remote_execution_pb2_grpc,
+)
from .._protos.google.bytestream import bytestream_pb2_grpc
-from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
- artifact_pb2, artifact_pb2_grpc, source_pb2, source_pb2_grpc
-
+from .._protos.build.buildgrid import local_cas_pb2, local_cas_pb2_grpc
+from .._protos.buildstream.v2 import (
+ buildstream_pb2,
+ buildstream_pb2_grpc,
+ artifact_pb2,
+ artifact_pb2_grpc,
+ source_pb2,
+ source_pb2_grpc,
+)
from ..utils import save_file_atomic, get_host_tool
@@ -57,6 +66,7 @@ class CASRemote:
def __init__(self, url: str):
self._url = url
+ self._local_cas = None
self._bytestream = None
self._cas = None
@@ -94,6 +104,7 @@ class CASRemote:
raise
# Set up the RPC stubs
+ self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._channel)
self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._channel)
self._cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._channel)
@@ -102,6 +113,11 @@ class CASRemote:
assert self._cas is not None, "CAS stub was not initialized"
return self._cas
+ def get_local_cas(self) -> local_cas_pb2_grpc.LocalContentAddressableStorageStub:
+ self._initialize_remote()
+ assert self._local_cas is not None, "Local CAS stub was not initialized"
+ return self._local_cas
+
def get_bytestream(self) -> bytestream_pb2_grpc.ByteStreamStub:
self._initialize_remote()
assert self._bytestream is not None, "Bytestream stub was not initialized"
@@ -618,19 +634,20 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
self.logger.debug("GetReference")
response = buildstream_pb2.GetReferenceResponse()
+ request = remote_execution_pb2.FindMissingBlobsRequest()
+ d = request.blob_digests.add()
+ d.CopyFrom(request.key)
+
try:
- tree = self.cas_cache.resolve_ref(request.key)
- try:
- self.cas_cache.update_tree_mtime(tree)
- except FileNotFoundError:
+ ref = self.cas.FindMissingBlobs(request)
+ except grpc.RpcError as err:
+ context.set_code(err.code())
+ if err.code() == grpc.StatusCode.NOT_FOUND:
self.cas_cache.remove_ref(request.key)
- context.set_code(grpc.StatusCode.NOT_FOUND)
- return response
+ return response
- response.digest.hash = tree.hash
- response.digest.size_bytes = tree.size_bytes
- except FileNotFoundError:
- context.set_code(grpc.StatusCode.NOT_FOUND)
+ response.digest.hash = ref.hash
+ response.digest.size_bytes = ref.size_bytes
return response
@@ -661,6 +678,7 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
def __init__(self, remote, root, cas_cache, *, update_cas=True):
super().__init__()
self.cas = remote.get_cas()
+ self.local_cas = remote.get_local_cas()
self.cas_cache = cas_cache
self.artifactdir = os.path.join(root, 'artifacts', 'refs')
self.update_cas = update_cas
@@ -677,6 +695,8 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
with open(artifact_path, 'rb') as f:
artifact.ParseFromString(f.read())
+ os.utime(artifact_path)
+
# Artifact-only servers will not have blobs on their system,
# so we can't reasonably update their mtimes. Instead, we exit
# early, and let the CAS server deal with its blobs.
@@ -695,28 +715,42 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
try:
if str(artifact.files):
- self.cas_cache.update_tree_mtime(artifact.files)
+ request = local_cas_pb2.FetchTreeRequest()
+ request.root_digest.CopyFrom(artifact.files)
+ request.fetch_file_blobs = True
+ self.local_cas.FetchTree(request)
if str(artifact.buildtree):
- # buildtrees might not be there
try:
- self.cas_cache.update_tree_mtime(artifact.buildtree)
- except FileNotFoundError:
- pass
+ request = local_cas_pb2.FetchTreeRequest()
+ request.root_digest.CopyFrom(artifact.buildtree)
+ request.fetch_file_blobs = True
+ self.local_cas.FetchTree(request)
+ except grpc.RpcError as err:
+ # buildtrees might not be there
+ if err.code() != grpc.StatusCode.NOT_FOUND:
+ raise
if str(artifact.public_data):
- os.utime(self.cas_cache.object_path(artifact.public_data))
+ request = remote_execution_pb2.FindMissingBlobsRequest()
+ d = request.blob_digests.add()
+ d.CopyFrom(artifact.public_data)
+ self.cas.FindMissingBlobs(request)
+ request = remote_execution_pb2.FindMissingBlobsRequest()
for log_file in artifact.logs:
- os.utime(self.cas_cache.object_path(log_file.digest))
-
- except FileNotFoundError:
- os.unlink(artifact_path)
- context.abort(grpc.StatusCode.NOT_FOUND,
- "Artifact files incomplete")
- except DecodeError:
- context.abort(grpc.StatusCode.NOT_FOUND,
- "Artifact files not valid")
+ d = request.blob_digests.add()
+ d.CopyFrom(log_file.digest)
+ self.cas.FindMissingBlobs(request)
+
+ except grpc.RpcError as err:
+ if err.code() == grpc.StatusCode.NOT_FOUND:
+ os.unlink(artifact_path)
+ context.abort(grpc.StatusCode.NOT_FOUND,
+ "Artifact files incomplete")
+ else:
+ context.abort(grpc.StatusCode.NOT_FOUND,
+ "Artifact files not valid")
return artifact