You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:24:38 UTC

[buildstream] 01/01: working copy

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch finn/cas-error-timeouts
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 4bebfa0474bb49346f83e66f7688311ca4cbc9a8
Author: Finn <fi...@codethink.co.uk>
AuthorDate: Wed Nov 28 13:50:43 2018 +0000

    working copy
---
 tests/artifactcache/pull.py      | 103 ++++++++++++++++-
 tests/testutils/__init__.py      |   1 +
 tests/testutils/timeoutserver.py | 236 +++++++++++++++++++++++++++++++++++++++
 3 files changed, 339 insertions(+), 1 deletion(-)

diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py
index 4c332bf..bec21d1 100644
--- a/tests/artifactcache/pull.py
+++ b/tests/artifactcache/pull.py
@@ -10,7 +10,7 @@ from buildstream._context import Context
 from buildstream._project import Project
 from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 
-from tests.testutils import cli, create_artifact_share
+from tests.testutils import cli, create_artifact_share, create_timeout_artifact_share
 
 
 # Project directory
@@ -327,3 +327,104 @@ def _test_pull_tree(user_config_file, project_dir, artifact_dir, artifact_digest
         queue.put((directory_digest.hash, directory_digest.size_bytes))
     else:
         queue.put("No remote configured")
+
+
+@pytest.mark.datafiles(DATA_DIR)
+def test_pully_pull(cli, tmpdir, datafiles):
+    project_dir = str(datafiles)
+
+    # Set up an artifact cache.
+    with create_timeout_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
+        # Configure artifact share
+        artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+        user_config_file = str(tmpdir.join('buildstream.conf'))
+        user_config = {
+            'scheduler': {
+                'pushers': 1
+            },
+            'artifacts': {
+                'url': share.repo,
+                'push': True,
+            }
+        }
+
+        # Write down the user configuration file
+        _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
+        # Ensure CLI calls will use it
+        cli.configure(user_config)
+
+        # First build the project with the artifact cache configured
+        result = cli.run(project=project_dir, args=['build', 'target.bst'])
+        result.assert_success()
+
+        # Assert that we are now cached locally
+        assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
+        # Assert that we shared/pushed the cached artifact
+        element_key = cli.get_element_key(project_dir, 'target.bst')
+        assert share.has_artifact('test', 'target.bst', element_key)
+
+        # Delete the artifact locally
+        cli.remove_artifact_from_cache(project_dir, 'target.bst')
+
+        # Assert that we are not cached locally anymore
+        assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
+
+        # Fake minimal context
+        context = Context()
+        context.load(config=user_config_file)
+        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+        context.set_message_handler(message_handler)
+
+        # Load the project and CAS cache
+        project = Project(project_dir, context)
+        project.ensure_fully_loaded()
+        cas = context.artifactcache
+
+        # Assert that the element's artifact is **not** cached
+        element = project.load_elements(['target.bst'])[0]
+        element_key = cli.get_element_key(project_dir, 'target.bst')
+        assert not cas.contains(element, element_key)
+
+        queue = multiprocessing.Queue()
+        # Use subprocess to avoid creation of gRPC threads in main BuildStream process
+        # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
+        process = multiprocessing.Process(target=_test_pull,
+                                          args=(user_config_file, project_dir, artifact_dir,
+                                                'target.bst', element_key, queue))
+
+        try:
+            # Keep SIGINT blocked in the child process
+            with _signals.blocked([signal.SIGINT], ignore=False):
+                process.start()
+
+            error = queue.get()
+            process.join()
+        except KeyboardInterrupt:
+            utils._kill_process_tree(process.pid)
+            raise
+
+        assert not error
+        assert cas.contains(element, element_key)
+
+    # Fake minimal context
+    context = Context()
+    context.load(config=user_config_file)
+    context.artifactdir = artifact_dir
+    context.set_message_handler(message_handler)
+
+    # Load the project manually
+    project = Project(project_dir, context)
+    project.ensure_fully_loaded()
+
+    # Create a local CAS cache handle
+    cas = context.artifactcache
+
+    # Manually setup the CAS remote
+    cas.setup_remotes(use_config=True)
+
+    if cas.has_push_remotes():
+        # Pull the artifact using the Tree object
+        directory_digest = cas.pull_tree(project, artifact_digest)
+        queue.put((directory_digest.hash, directory_digest.size_bytes))
+    else:
+        queue.put("No remote configured")
diff --git a/tests/testutils/__init__.py b/tests/testutils/__init__.py
index eb7211e..6b64183 100644
--- a/tests/testutils/__init__.py
+++ b/tests/testutils/__init__.py
@@ -30,3 +30,4 @@ from .element_generators import create_element_size, update_element_size
 from .junction import generate_junction
 from .runner_integration import wait_for_cache_granularity
 from .python_repo import setup_pypi_repo
+from .timeoutserver import create_timeout_artifact_share
diff --git a/tests/testutils/timeoutserver.py b/tests/testutils/timeoutserver.py
new file mode 100644
index 0000000..3cc4aed
--- /dev/null
+++ b/tests/testutils/timeoutserver.py
@@ -0,0 +1,236 @@
+import string
+import pytest
+import subprocess
+import os
+import shutil
+import signal
+from collections import namedtuple
+from concurrent import futures
+
+from contextlib import contextmanager
+from multiprocessing import Process, Queue
+import grpc
+import pytest_cov
+
+from buildstream import _yaml
+from buildstream._artifactcache.cascache import CASCache
+from buildstream._artifactcache import casserver
+from buildstream._exceptions import CASError
+from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
+from buildstream._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
+from buildstream._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
+
+
+class TimeoutCasCache(CASCache):
+    pass
+
+
+# ArtifactShare()
+#
+# Abstract class providing scaffolding for
+# generating data to be used with various sources
+#
+# Args:
+#    directory (str): The base temp directory for the test
+#    total_space (int): Mock total disk space on artifact server
+#    free_space (int): Mock free disk space on artifact server
+#
+class TimeoutArtifactShare():
+
+    def __init__(self, directory, *, total_space=None, free_space=None):
+
+        # The working directory for the artifact share (in case it
+        # needs to do something outside of its backend's storage folder).
+        #
+        self.directory = os.path.abspath(directory)
+
+        # The directory the actual repo will be stored in.
+        #
+        # Unless this gets more complicated, just use this directly
+        # in tests as a remote artifact push/pull configuration
+        #
+        self.repodir = os.path.join(self.directory, 'repo')
+
+        os.makedirs(self.repodir)
+
+        self.cas = CASCache(self.repodir)
+
+        self.total_space = total_space
+        self.free_space = free_space
+
+        q = Queue()
+
+        self.process = Process(target=self.run, args=(q,))
+        self.process.start()
+
+        # Retrieve port from server subprocess
+        port = q.get(timeout=1)
+
+        self.repo = 'http://localhost:{}'.format(port)
+
+    # run():
+    #
+    # Run the artifact server.
+    #
+    def run(self, q):
+        pytest_cov.embed.cleanup_on_sigterm()
+
+        # Optionally mock statvfs
+        if self.total_space:
+            if self.free_space is None:
+                self.free_space = self.total_space
+            os.statvfs = self._mock_statvfs
+
+        server = create_timeout_server(self.repodir, enable_push=True)
+        port = server.add_insecure_port('localhost:0')
+
+        server.start()
+
+        # Send port to parent
+        q.put(port)
+
+        # Sleep until termination by signal
+        signal.pause()
+
+    # has_object():
+    #
+    # Checks whether the object is present in the share
+    #
+    # Args:
+    #    digest (str): The object's digest
+    #
+    # Returns:
+    #    (bool): True if the object exists in the share, otherwise false.
+    def has_object(self, digest):
+
+        assert isinstance(digest, remote_execution_pb2.Digest)
+
+        object_path = self.cas.objpath(digest)
+
+        return os.path.exists(object_path)
+
+    # has_artifact():
+    #
+    # Checks whether the artifact is present in the share
+    #
+    # Args:
+    #    project_name (str): The project name
+    #    element_name (str): The element name
+    #    cache_key (str): The cache key
+    #
+    # Returns:
+    #    (str): artifact digest if the artifact exists in the share, otherwise None.
+    def has_artifact(self, project_name, element_name, cache_key):
+
+        # NOTE: This should be kept in line with our
+        #       artifact cache code, the below is the
+        #       same alI can confidently go for creating an artifact reference
+        #
+
+        # Replace path separator and chop off the .bst suffix
+        element_name = os.path.splitext(element_name.replace(os.sep, '-'))[0]
+
+        valid_chars = string.digits + string.ascii_letters + '-._'
+        element_name = ''.join([
+            x if x in valid_chars else '_'
+            for x in element_name
+        ])
+        artifact_key = '{0}/{1}/{2}'.format(project_name, element_name, cache_key)
+
+        try:
+            tree = self.cas.resolve_ref(artifact_key)
+            return tree
+        except CASError:
+            return None
+
+    # close():
+    #
+    # Remove the artifact share.
+    #
+    def close(self):
+        self.process.terminate()
+        self.process.join()
+
+        shutil.rmtree(self.directory)
+
+    def _mock_statvfs(self, path):
+        repo_size = 0
+        for root, _, files in os.walk(self.repodir):
+            for filename in files:
+                repo_size += os.path.getsize(os.path.join(root, filename))
+
+        return statvfs_result(f_blocks=self.total_space,
+                              f_bfree=self.free_space - repo_size,
+                              f_bavail=self.free_space - repo_size,
+                              f_bsize=1)
+
+
+# create_artifact_share()
+#
+# Create an ArtifactShare for use in a test case
+#
+@contextmanager
+def create_timeout_artifact_share(directory, *, total_space=None, free_space=None):
+    share = TimeoutArtifactShare(directory, total_space=total_space, free_space=free_space)
+    try:
+        yield share
+    finally:
+        share.close()
+
+
+statvfs_result = namedtuple('statvfs_result', 'f_blocks f_bfree f_bsize f_bavail')
+
+
+# create_timeout_server():
+#
+# Create gRPC CAS artifact server as specified in the Remote Execution API.
+#
+# Args:
+#     repo (str): Path to CAS repository
+#     enable_push (bool): Whether to allow blob uploads and artifact updates
+#
+def create_timeout_server(repo, *, enable_push):
+    cas = TimeoutCasCache(os.path.abspath(repo))
+
+    # Use max_workers default from Python 3.5+
+    max_workers = (os.cpu_count() or 1) * 5
+    server = grpc.server(futures.ThreadPoolExecutor(max_workers))
+
+    bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
+        _ByteStreamServicer(cas, enable_push=enable_push), server)
+
+    remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
+        _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
+
+    remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
+        _CapabilitiesServicer(), server)
+
+    buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
+        _ReferenceStorageServicer(cas, enable_push=enable_push), server)
+
+    return server
+
+
+class _ByteStreamServicer(casserver._ByteStreamServicer):
+    pass
+
+
+class _ContentAddressableStorageServicer(casserver._ContentAddressableStorageServicer):
+
+    def __init__(self, cas, *, enable_push):
+        self.__read_count = 0
+        super().__init__(cas=cas, enable_push=enable_push)
+
+    def BatchReadBlobs(self, request, context):
+        # self.__read_count += 1
+        # import time
+        # time.sleep(5)
+        return super().BatchReadBlobs(request, context)
+
+
+class _CapabilitiesServicer(casserver._CapabilitiesServicer):
+    pass
+
+
+class _ReferenceStorageServicer(casserver._ReferenceStorageServicer):
+    pass