You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:48:13 UTC

[buildstream] branch jmac/hackathon-rbe-execution created (now 046739a)

This is an automated email from the ASF dual-hosted git repository.

root pushed a change to branch jmac/hackathon-rbe-execution
in repository https://gitbox.apache.org/repos/asf/buildstream.git.


      at 046739a  A series of terrible hacks to make BuildStream run things on the RBE test instance. Missing various checks.

This branch includes the following new commits:

     new 046739a  A series of terrible hacks to make BuildStream run things on the RBE test instance. Missing various checks.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[buildstream] 01/01: A series of terrible hacks to make BuildStream run things on the RBE test instance. Missing various checks.

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch jmac/hackathon-rbe-execution
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 046739aa1cfe6e78f1485a586c706b106d363a2a
Author: Jim MacArthur <ji...@codethink.co.uk>
AuthorDate: Thu Oct 11 20:57:09 2018 +0100

    A series of terrible hacks to make BuildStream run things on the RBE test instance. Missing various checks.
---
 buildstream/_artifactcache/cascache.py | 94 +++++++++++++---------------------
 buildstream/sandbox/_sandboxremote.py  | 24 +++++++--
 2 files changed, 56 insertions(+), 62 deletions(-)

diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 3e63608..2d79abd 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -43,6 +43,11 @@ from .._exceptions import ArtifactError
 
 from . import ArtifactCache
 
+from google.oauth2 import service_account
+import google
+from google.auth.transport import grpc as google_auth_transport_grpc
+from google.auth.transport import requests as google_auth_transport_requests
+
 
 # The default limit for gRPC messages is 4 MiB.
 # Limit payload to 1 MiB to leave sufficient headroom for metadata.
@@ -232,7 +237,7 @@ class CASCache(ArtifactCache):
         ref = self.get_artifact_fullname(element, key)
 
         project = element._get_project()
-
+        return False
         for remote in self._remotes[project]:
             try:
                 remote.init()
@@ -299,29 +304,8 @@ class CASCache(ArtifactCache):
             for ref in refs:
                 tree = self.resolve_ref(ref)
 
-                # Check whether ref is already on the server in which case
-                # there is no need to push the artifact
-                try:
-                    request = buildstream_pb2.GetReferenceRequest()
-                    request.key = ref
-                    response = remote.ref_storage.GetReference(request)
-
-                    if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
-                        # ref is already on the server with the same tree
-                        continue
-
-                except grpc.RpcError as e:
-                    if e.code() != grpc.StatusCode.NOT_FOUND:
-                        # Intentionally re-raise RpcError for outer except block.
-                        raise
-
                 self._send_directory(remote, tree)
 
-                request = buildstream_pb2.UpdateReferenceRequest()
-                request.keys.append(ref)
-                request.digest.hash = tree.hash
-                request.digest.size_bytes = tree.size_bytes
-                remote.ref_storage.UpdateReference(request)
 
                 skipped_remote = False
         except grpc.RpcError as e:
@@ -367,9 +351,9 @@ class CASCache(ArtifactCache):
 
         push_remotes = [r for r in self._remotes[project] if r.spec.push]
 
-        if not push_remotes:
-            raise ArtifactError("CASCache: push_directory was called, but no remote artifact " +
-                                "servers are configured as push remotes.")
+        #if not push_remotes:
+        #    raise ArtifactError("CASCache: push_directory was called, but no remote artifact " +
+        #                        "servers are configured as push remotes.")
 
         if directory.ref is None:
             return
@@ -400,7 +384,7 @@ class CASCache(ArtifactCache):
     def _verify_digest_on_remote(self, remote, digest):
         # Check whether ref is already on the server in which case
         # there is no need to push the artifact
-        request = remote_execution_pb2.FindMissingBlobsRequest()
+        request = remote_execution_pb2.FindMissingBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
         request.blob_digests.extend([digest])
 
         response = remote.cas.FindMissingBlobs(request)
@@ -812,13 +796,17 @@ class CASCache(ArtifactCache):
             remote = _CASRemote(remote_spec)
             remote.init()
 
-            request = buildstream_pb2.StatusRequest()
-            response = remote.ref_storage.Status(request)
-
-            if remote_spec.push and not response.allow_updates:
+            print("Remote init complete - firing status request...")
+            #request = buildstream_pb2.StatusRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
+            #response = remote.ref_storage.Status(request)
+            print("Status obtained")
+            
+            if False:# remote_spec.push:
+                print("Cannot push")
                 q.put('Artifact server does not allow push')
             else:
                 # No error
+                print("No errors detected")
                 q.put(None)
 
         except grpc.RpcError as e:
@@ -1000,9 +988,10 @@ class CASCache(ArtifactCache):
         return dirdigest
 
     def _send_blob(self, remote, digest, stream, u_uid=uuid.uuid4()):
-        resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
-                                  digest.hash, str(digest.size_bytes)])
-
+        instance_name="projects/bazelcon18-rbe-shared/instances/default_instance"
+        resource_name = instance_name+'/'+('/'.join(['uploads', str(u_uid), 'blobs',
+                                  digest.hash, str(digest.size_bytes)]))
+        print("Trying to send resource named: {}".format(resource_name))
         def request_stream(resname, instream):
             offset = 0
             finished = False
@@ -1033,7 +1022,7 @@ class CASCache(ArtifactCache):
         missing_blobs = dict()
         # Limit size of FindMissingBlobs request
         for required_blobs_group in _grouper(required_blobs, 512):
-            request = remote_execution_pb2.FindMissingBlobsRequest()
+            request = remote_execution_pb2.FindMissingBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
 
             for required_digest in required_blobs_group:
                 d = request.blob_digests.add()
@@ -1093,28 +1082,14 @@ class _CASRemote():
             elif url.scheme == 'https':
                 port = url.port or 443
 
-                if self.spec.server_cert:
-                    with open(self.spec.server_cert, 'rb') as f:
-                        server_cert_bytes = f.read()
-                else:
-                    server_cert_bytes = None
-
-                if self.spec.client_key:
-                    with open(self.spec.client_key, 'rb') as f:
-                        client_key_bytes = f.read()
-                else:
-                    client_key_bytes = None
+                SCOPES = ['https://www.googleapis.com/auth/cloud-platform']
+                SERVICE_ACCOUNT_FILE = '/tmp/key.json'
 
-                if self.spec.client_cert:
-                    with open(self.spec.client_cert, 'rb') as f:
-                        client_cert_bytes = f.read()
-                else:
-                    client_cert_bytes = None
+                credentials = service_account.Credentials.from_service_account_file(
+	            SERVICE_ACCOUNT_FILE, scopes=SCOPES)
+                http_request = google_auth_transport_requests.Request()
 
-                credentials = grpc.ssl_channel_credentials(root_certificates=server_cert_bytes,
-                                                           private_key=client_key_bytes,
-                                                           certificate_chain=client_cert_bytes)
-                self.channel = grpc.secure_channel('{}:{}'.format(url.hostname, port), credentials)
+                self.channel = google_auth_transport_grpc.secure_authorized_channel(credentials, http_request, 'remotebuildexecution.googleapis.com:443')
             else:
                 raise ArtifactError("Unsupported URL: {}".format(self.spec.url))
 
@@ -1125,7 +1100,7 @@ class _CASRemote():
 
             self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
             try:
-                request = remote_execution_pb2.GetCapabilitiesRequest()
+                request = remote_execution_pb2.GetCapabilitiesRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
                 response = self.capabilities.GetCapabilities(request)
                 server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
                 if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
@@ -1136,9 +1111,10 @@ class _CASRemote():
                     raise
 
             # Check whether the server supports BatchReadBlobs()
+            print("Testing BatchBlobsRequest")
             self.batch_read_supported = False
             try:
-                request = remote_execution_pb2.BatchReadBlobsRequest()
+                request = remote_execution_pb2.BatchReadBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
                 response = self.cas.BatchReadBlobs(request)
                 self.batch_read_supported = True
             except grpc.RpcError as e:
@@ -1146,9 +1122,10 @@ class _CASRemote():
                     raise
 
             # Check whether the server supports BatchUpdateBlobs()
+            print("Testing BatchUpdateBlobsRequest")
             self.batch_update_supported = False
             try:
-                request = remote_execution_pb2.BatchUpdateBlobsRequest()
+                request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
                 response = self.cas.BatchUpdateBlobs(request)
                 self.batch_update_supported = True
             except grpc.RpcError as e:
@@ -1156,6 +1133,7 @@ class _CASRemote():
                         e.code() != grpc.StatusCode.PERMISSION_DENIED):
                     raise
 
+            print("Remote init complete")
             self._initialized = True
 
 
@@ -1209,7 +1187,7 @@ class _CASBatchUpdate():
     def __init__(self, remote):
         self._remote = remote
         self._max_total_size_bytes = remote.max_batch_total_size_bytes
-        self._request = remote_execution_pb2.BatchUpdateBlobsRequest()
+        self._request = remote_execution_pb2.BatchUpdateBlobsRequest(instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
         self._size = 0
         self._sent = False
 
diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py
index ab0c31b..8c677e5 100644
--- a/buildstream/sandbox/_sandboxremote.py
+++ b/buildstream/sandbox/_sandboxremote.py
@@ -29,6 +29,10 @@ from ..storage._casbaseddirectory import CasBasedDirectory
 from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
 from .._protos.google.rpc import code_pb2
 
+from google.oauth2 import service_account
+import google
+from google.auth.transport import grpc as google_auth_transport_grpc
+from google.auth.transport import requests as google_auth_transport_requests
 
 class SandboxError(Exception):
     pass
@@ -49,6 +53,8 @@ class SandboxRemote(Sandbox):
             raise SandboxError("Configured remote URL '{}' does not match the expected layout. "
                                .format(kwargs['server_url']) +
                                "It should be of the form <protocol>://<domain name>:<port>.")
+        elif url.scheme == 'https':
+            print("Using secure mode to '{}'.".format(url))
         elif url.scheme != 'http':
             raise SandboxError("Configured remote '{}' uses an unsupported protocol. "
                                "Only plain HTTP is currenlty supported (no HTTPS).")
@@ -92,10 +98,18 @@ class SandboxRemote(Sandbox):
             return None
 
         # Next, try to create a communication channel to the BuildGrid server.
-        channel = grpc.insecure_channel(self.server_url)
+        SCOPES = ['https://www.googleapis.com/auth/cloud-platform']
+        SERVICE_ACCOUNT_FILE = '/tmp/key.json'
+        
+        credentials = service_account.Credentials.from_service_account_file(
+	    SERVICE_ACCOUNT_FILE, scopes=SCOPES)
+        http_request = google_auth_transport_requests.Request()
+        channel = google_auth_transport_grpc.secure_authorized_channel(credentials, http_request, 'remotebuildexecution.googleapis.com:443')
+        #channel = grpc.secure_channel(self.server_url, credentials)
         stub = remote_execution_pb2_grpc.ExecutionStub(channel)
         request = remote_execution_pb2.ExecuteRequest(action_digest=action_digest,
-                                                      skip_cache_lookup=False)
+                                                      skip_cache_lookup=False,
+                                                      instance_name="projects/bazelcon18-rbe-shared/instances/default_instance")
         try:
             operation_iterator = stub.Execute(request)
         except grpc.RpcError:
@@ -133,7 +147,7 @@ class SandboxRemote(Sandbox):
         tree_digest = output_directories[0].tree_digest
         if tree_digest is None or not tree_digest.hash:
             raise SandboxError("Output directory structure had no digest attached.")
-
+        print("Output of job: Tree digest is {}/{}".format(tree_digest.hash, tree_digest.size_bytes))
         context = self._get_context()
         cascache = context.artifactcache
         # Now do a pull to ensure we have the necessary parts.
@@ -212,6 +226,8 @@ class SandboxRemote(Sandbox):
                 raise SandboxError("Remote server failed at executing the build request.")
 
         action_result = execution_response.result
+        print("Exit code: {}".format(action_result.exit_code))
+        print("Stdout digest: {}/{}".format(action_result.stdout_digest.hash, action_result.stdout_digest.size_bytes))
 
         if action_result.exit_code != 0:
             # A normal error during the build: the remote execution system
@@ -219,7 +235,7 @@ class SandboxRemote(Sandbox):
             # action_result.stdout and action_result.stderr also contains
             # build command outputs which we ignore at the moment.
             return action_result.exit_code
-
+        
         self.process_job_output(action_result.output_directories, action_result.output_files)
 
         return 0