You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:48:14 UTC

[buildstream] 01/01: A series of terrible hacks to make BuildStream run things on the RBE test instance. Missing various checks.

This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch jmac/hackathon-rbe-execution
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 046739aa1cfe6e78f1485a586c706b106d363a2a
Author: Jim MacArthur <ji...@codethink.co.uk>
AuthorDate: Thu Oct 11 20:57:09 2018 +0100

    A series of terrible hacks to make BuildStream run things on the RBE test instance. Missing various checks.
---
 buildstream/_artifactcache/cascache.py | 94 +++++++++++++---------------------
 buildstream/sandbox/_sandboxremote.py  | 24 +++++++--
 2 files changed, 56 insertions(+), 62 deletions(-)

diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 3e63608..2d79abd 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -43,6 +43,11 @@ from .._exceptions import ArtifactError
 
 from . import ArtifactCache
 
+from google.oauth2 import service_account
+import google
+from google.auth.transport import grpc as google_auth_transport_grpc
+from google.auth.transport import requests as google_auth_transport_requests
+
 
 # The default limit for gRPC messages is 4 MiB.
 # Limit payload to 1 MiB to leave sufficient headroom for metadata.
@@ -232,7 +237,7 @@ class CASCache(ArtifactCache):
         ref = self.get_artifact_fullname(element, key)
 
         project = element._get_project()
-
+        return False
         for remote in self._remotes[project]:
             try:
                 remote.init()
@@ -299,29 +304,8 @@ class CASCache(ArtifactCache):
             for ref in refs:
                 tree = self.resolve_ref(ref)
 
-                # Check whether ref is already on the server in which case
-                # there is no need to push the artifact
-                try:
-                    request = buildstream_pb2.GetReferenceRequest()
-                    request.key = ref
-                    response = remote.ref_storage.GetReference(request)
-
-                    if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
-                        # ref is already on the server with the same tree
-                        continue
-
-                except grpc.RpcError as e:
-                    if e.code() != grpc.StatusCode.NOT_FOUND:
-                        # Intentionally re-raise RpcError for outer except block.
-                        raise
-
                 self._send_directory(remote, tree)
 
-                request = buildstream_pb2.UpdateReferenceRequest()
-                request.keys.append(ref)
-                request.digest.hash = tree.hash
-                request.digest.size_bytes = tree.size_bytes
-                remote.ref_storage.UpdateReference(request)
 
                 skipped_remote = False
         except grpc.RpcError as e:
@@ -367,9 +351,9 @@ class CASCache(ArtifactCache):
 
         push_remotes = [r for r in self._remotes[project] if r.spec.push]
 
-        if not push_remotes:
-            raise ArtifactError("CASCache: push_directory was called, but no remote artifact " +
-                                "servers are configured as push remotes.")
+        #if not push_remotes:
+        #    raise ArtifactError("CASCache: push_directory was called, but no remote artifact " +
+        #                        "servers are configured as push remotes.")
 
         if directory.ref is None:
             return
@@ -400,7 +384,7 @@ class CASCache(ArtifactCache):
     def _verify_digest_on_remote(self, remote, digest):
         # Check whether ref is already on the server in which case
         # there is no need to push the artifact
-        request = remote_execution_pb2.FindMissingBlobsRequest()
+        request = remote_execution_pb2.FindMissingBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
         request.blob_digests.extend([digest])
 
         response = remote.cas.FindMissingBlobs(request)
@@ -812,13 +796,17 @@ class CASCache(ArtifactCache):
             remote = _CASRemote(remote_spec)
             remote.init()
 
-            request = buildstream_pb2.StatusRequest()
-            response = remote.ref_storage.Status(request)
-
-            if remote_spec.push and not response.allow_updates:
+            print("Remote init complete - firing status request...")
+            #request = buildstream_pb2.StatusRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
+            #response = remote.ref_storage.Status(request)
+            print("Status obtained")
+            
+            if False:# remote_spec.push:
+                print("Cannot push")
                 q.put('Artifact server does not allow push')
             else:
                 # No error
+                print("No errors detected")
                 q.put(None)
 
         except grpc.RpcError as e:
@@ -1000,9 +988,10 @@ class CASCache(ArtifactCache):
         return dirdigest
 
     def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
-        resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
-                                  digest.hash, str(digest.size_bytes)])
-
+        instance_name="projects/bazelcon18-rbe-shared/instances/default_instance"
+        resource_name = instance_name+'/'+('/'.join(['uploads', str(u_uid), 'blobs',
+                                  digest.hash, str(digest.size_bytes)]))
+        print("Trying to send resource named: {}".format(resource_name))
         def request_stream(resname, instream):
             offset = 0
             finished = False
@@ -1033,7 +1022,7 @@ class CASCache(ArtifactCache):
         missing_blobs = dict()
         # Limit size of FindMissingBlobs request
         for required_blobs_group in _grouper(required_blobs, 512):
-            request = remote_execution_pb2.FindMissingBlobsRequest()
+            request = remote_execution_pb2.FindMissingBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
 
             for required_digest in required_blobs_group:
                 d = request.blob_digests.add()
@@ -1093,28 +1082,14 @@ class _CASRemote():
             elif url.scheme == 'https':
                 port = url.port or 443
 
-                if self.spec.server_cert:
-                    with open(self.spec.server_cert, 'rb') as f:
-                        server_cert_bytes = f.read()
-                else:
-                    server_cert_bytes = None
-
-                if self.spec.client_key:
-                    with open(self.spec.client_key, 'rb') as f:
-                        client_key_bytes = f.read()
-                else:
-                    client_key_bytes = None
+                SCOPES = ['https://www.googleapis.com/auth/cloud-platform']
+                SERVICE_ACCOUNT_FILE = '/tmp/key.json'
 
-                if self.spec.client_cert:
-                    with open(self.spec.client_cert, 'rb') as f:
-                        client_cert_bytes = f.read()
-                else:
-                    client_cert_bytes = None
+                credentials = service_account.Credentials.from_service_account_file(
+	            SERVICE_ACCOUNT_FILE, scopes=SCOPES)
+                http_request = google_auth_transport_requests.Request()
 
-                credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_bytes,
-                                                           private_key=client_key_bytes,
-                                                           certificate_chain=client_cert_bytes)
-                self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
+                self.channel = google_auth_transport_grpc.secure_authorized_channel(credentials, http_request, 'remotebuildexecution.googleapis.com:443')
             else:
                 raise ArtifactError("Unsupported URL: {}".format(self.spec.url))
 
@@ -1125,7 +1100,7 @@ class _CASRemote():
 
             self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
             try:
-                request = remote_execution_pb2.GetCapabilitiesRequest()
+                request = remote_execution_pb2.GetCapabilitiesRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
                 response = self.capabilities.GetCapabilities(request)
                 server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
                 if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
@@ -1136,9 +1111,10 @@ class _CASRemote():
                     raise
 
             # Check whether the server supports BatchReadBlobs()
+            print("Testing BatchBlobsRequest")
             self.batch_read_supported = False
             try:
-                request = remote_execution_pb2.BatchReadBlobsRequest()
+                request = remote_execution_pb2.BatchReadBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
                 response = self.cas.BatchReadBlobs(request)
                 self.batch_read_supported = True
             except grpc.RpcError as e:
@@ -1146,9 +1122,10 @@ class _CASRemote():
                     raise
 
             # Check whether the server supports BatchUpdateBlobs()
+            print("Testing BatchUpdateBlobsRequest")
             self.batch_update_supported = False
             try:
-                request = remote_execution_pb2.BatchUpdateBlobsRequest()
+                request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
                 response = self.cas.BatchUpdateBlobs(request)
                 self.batch_update_supported = True
             except grpc.RpcError as e:
@@ -1156,6 +1133,7 @@ class _CASRemote():
                         e.code() != grpc.StatusCode.PERMISSION_DENIED):
                     raise
 
+            print("Remote init complete")
             self._initialized = True
 
 
@@ -1209,7 +1187,7 @@ class _CASBatchUpdate():
     def __init__(self, remote):
         self._remote = remote
         self._max_total_size_bytes = remote.max_batch_total_size_bytes
-        self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
+        self._request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
         self._size = 0
         self._sent = False
 
diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py
index ab0c31b..8c677e5 100644
--- a/buildstream/sandbox/_sandboxremote.py
+++ b/buildstream/sandbox/_sandboxremote.py
@@ -29,6 +29,10 @@ from ..storage._casbaseddirectory import CasBasedDirectory
 from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 from .._protos.google.rpc import code_pb2
 
+from google.oauth2 import service_account
+import google
+from google.auth.transport import grpc as google_auth_transport_grpc
+from google.auth.transport import requests as google_auth_transport_requests
 
 class SandboxError(Exception):
     pass
@@ -49,6 +53,8 @@ class SandboxRemote(Sandbox):
             raise SandboxError("Configured remote URL '{}' does not match the expected layout. "
                                .format(kwargs['server_url']) +
                                "It should be of the form <protocol>://<domain name>:<port>.")
+        elif url.scheme == 'https':
+            print("Using secure mode to '{}'.".format(url))
         elif url.scheme != 'http':
             raise SandboxError("Configured remote '{}' uses an unsupported protocol. "
                                "Only plain HTTP is currenlty supported (no HTTPS).")
@@ -92,10 +98,18 @@ class SandboxRemote(Sandbox):
             return None
 
         # Next, try to create a communication channel to the BuildGrid server.
-        channel = grpc.insecure_channel(self.server_url)
+        SCOPES = ['https://www.googleapis.com/auth/cloud-platform']
+        SERVICE_ACCOUNT_FILE = '/tmp/key.json'
+        
+        credentials = service_account.Credentials.from_service_account_file(
+	    SERVICE_ACCOUNT_FILE, scopes=SCOPES)
+        http_request = google_auth_transport_requests.Request()
+        channel = google_auth_transport_grpc.secure_authorized_channel(credentials, http_request, 'remotebuildexecution.googleapis.com:443')
+        #channel = grpc.secure_channel(self.server_url, credentials)
         stub = remote_execution_pb2_grpc.ExecutionStub(channel)
         request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
-                                                      skip_cache_lookup=False)
+                                                      skip_cache_lookup=False,
+                                                      instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
         try:
             operation_iterator = stub.Execute(request)
         except grpc.RpcError:
@@ -133,7 +147,7 @@ class SandboxRemote(Sandbox):
         tree_digest = output_directories[0].tree_digest
         if tree_digest is None or not tree_digest.hash:
             raise SandboxError("Output directory structure had no digest attached.")
-
+        print("Output of job: Tree digest is {}/{}".format(tree_digest.hash, tree_digest.size_bytes))
         context = self._get_context()
         cascache = context.artifactcache
         # Now do a pull to ensure we have the necessary parts.
@@ -212,6 +226,8 @@ class SandboxRemote(Sandbox):
                 raise SandboxError("Remote server failed at executing the build request.")
 
         action_result = execution_response.result
+        print("Exit code: {}".format(action_result.exit_code))
+        print("Stdout digest: {}/{}".format(action_result.stdout_digest.hash, action_result.stdout_digest.size_bytes))
 
         if action_result.exit_code != 0:
             # A normal error during the build: the remote execution system
@@ -219,7 +235,7 @@ class SandboxRemote(Sandbox):
             # action_result.stdout and action_result.stderr also contains
             # build command outputs which we ignore at the moment.
             return action_result.exit_code
-
+        
         self.process_job_output(action_result.output_directories, action_result.output_files)
 
         return 0