You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:24:37 UTC

[buildstream] branch finn/cas-error-timeouts created (now 4bebfa0)

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch finn/cas-error-timeouts
in repository https://gitbox.apache.org/repos/asf/buildstream.git.


      at 4bebfa0  working copy

This branch includes the following new commits:

     new 4bebfa0  working copy

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[buildstream] 01/01: working copy

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch finn/cas-error-timeouts
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 4bebfa0474bb49346f83e66f7688311ca4cbc9a8
Author: Finn <fi...@codethink.co.uk>
AuthorDate: Wed Nov 28 13:50:43 2018 +0000

    working copy
---
 tests/artifactcache/pull.py      | 103 ++++++++++++++++-
 tests/testutils/__init__.py      |   1 +
 tests/testutils/timeoutserver.py | 236 +++++++++++++++++++++++++++++++++++++++
 3 files changed, 339 insertions(+), 1 deletion(-)

diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py
index 4c332bf..bec21d1 100644
--- a/tests/artifactcache/pull.py
+++ b/tests/artifactcache/pull.py
@@ -10,7 +10,7 @@ from buildstream._context import Context
 from buildstream._project import Project
 from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 
-from tests.testutils import cli, create_artifact_share
+from tests.testutils import cli, create_artifact_share, create_timeout_artifact_share
 
 
 # Project directory
@@ -327,3 +327,104 @@ def _test_pull_tree(user_config_file, project_dir, artifact_dir, artifact_digest
         queue.put((directory_digest.hash, directory_digest.size_bytes))
     else:
         queue.put("No remote configured")
+
+
+@pytest.mark.datafiles(DATA_DIR)
+def test_pully_pull(cli, tmpdir, datafiles):
+    project_dir = str(datafiles)
+
+    # Set up an artifact cache.
+    with create_timeout_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share:
+        # Configure artifact share
+        artifact_dir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+        user_config_file = str(tmpdir.join('buildstream.conf'))
+        user_config = {
+            'scheduler': {
+                'pushers': 1
+            },
+            'artifacts': {
+                'url': share.repo,
+                'push': True,
+            }
+        }
+
+        # Write down the user configuration file
+        _yaml.dump(_yaml.node_sanitize(user_config), filename=user_config_file)
+        # Ensure CLI calls will use it
+        cli.configure(user_config)
+
+        # First build the project with the artifact cache configured
+        result = cli.run(project=project_dir, args=['build', 'target.bst'])
+        result.assert_success()
+
+        # Assert that we are now cached locally
+        assert cli.get_element_state(project_dir, 'target.bst') == 'cached'
+        # Assert that we shared/pushed the cached artifact
+        element_key = cli.get_element_key(project_dir, 'target.bst')
+        assert share.has_artifact('test', 'target.bst', element_key)
+
+        # Delete the artifact locally
+        cli.remove_artifact_from_cache(project_dir, 'target.bst')
+
+        # Assert that we are not cached locally anymore
+        assert cli.get_element_state(project_dir, 'target.bst') != 'cached'
+
+        # Fake minimal context
+        context = Context()
+        context.load(config=user_config_file)
+        context.artifactdir = os.path.join(str(tmpdir), 'cache', 'artifacts')
+        context.set_message_handler(message_handler)
+
+        # Load the project and CAS cache
+        project = Project(project_dir, context)
+        project.ensure_fully_loaded()
+        cas = context.artifactcache
+
+        # Assert that the element's artifact is **not** cached
+        element = project.load_elements(['target.bst'])[0]
+        element_key = cli.get_element_key(project_dir, 'target.bst')
+        assert not cas.contains(element, element_key)
+
+        queue = multiprocessing.Queue()
+        # Use subprocess to avoid creation of gRPC threads in main BuildStream process
+        # See https://github.com/grpc/grpc/blob/master/doc/fork_support.md for details
+        process = multiprocessing.Process(target=_test_pull,
+                                          args=(user_config_file, project_dir, artifact_dir,
+                                                'target.bst', element_key, queue))
+
+        try:
+            # Keep SIGINT blocked in the child process
+            with _signals.blocked([signal.SIGINT], ignore=False):
+                process.start()
+
+            error = queue.get()
+            process.join()
+        except KeyboardInterrupt:
+            utils._kill_process_tree(process.pid)
+            raise
+
+        assert not error
+        assert cas.contains(element, element_key)
+
+    # Fake minimal context
+    context = Context()
+    context.load(config=user_config_file)
+    context.artifactdir = artifact_dir
+    context.set_message_handler(message_handler)
+
+    # Load the project manually
+    project = Project(project_dir, context)
+    project.ensure_fully_loaded()
+
+    # Create a local CAS cache handle
+    cas = context.artifactcache
+
+    # Manually setup the CAS remote
+    cas.setup_remotes(use_config=True)
+
+    if cas.has_push_remotes():
+        # Pull the artifact using the Tree object
+        directory_digest = cas.pull_tree(project, artifact_digest)
+        queue.put((directory_digest.hash, directory_digest.size_bytes))
+    else:
+        queue.put("No remote configured")
diff --git a/tests/testutils/__init__.py b/tests/testutils/__init__.py
index eb7211e..6b64183 100644
--- a/tests/testutils/__init__.py
+++ b/tests/testutils/__init__.py
@@ -30,3 +30,4 @@ from .element_generators import create_element_size, update_element_size
 from .junction import generate_junction
 from .runner_integration import wait_for_cache_granularity
 from .python_repo import setup_pypi_repo
+from .timeoutserver import create_timeout_artifact_share
diff --git a/tests/testutils/timeoutserver.py b/tests/testutils/timeoutserver.py
new file mode 100644
index 0000000..3cc4aed
--- /dev/null
+++ b/tests/testutils/timeoutserver.py
@@ -0,0 +1,236 @@
+import string
+import pytest
+import subprocess
+import os
+import shutil
+import signal
+from collections import namedtuple
+from concurrent import futures
+
+from contextlib import contextmanager
+from multiprocessing import Process, Queue
+import grpc
+import pytest_cov
+
+from buildstream import _yaml
+from buildstream._artifactcache.cascache import CASCache
+from buildstream._artifactcache import casserver
+from buildstream._exceptions import CASError
+from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
+from buildstream._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
+from buildstream._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
+
+
+class TimeoutCasCache(CASCache):
+    pass
+
+
+# ArtifactShare()
+#
+# Abstract class providing scaffolding for
+# generating data to be used with various sources
+#
+# Args:
+#    directory (str): The base temp directory for the test
+#    total_space (int): Mock total disk space on artifact server
+#    free_space (int): Mock free disk space on artifact server
+#
+class TimeoutArtifactShare():
+
+    def __init__(self, directory, *, total_space=None, free_space=None):
+
+        # The working directory for the artifact share (in case it
+        # needs to do something outside of its backend's storage folder).
+        #
+        self.directory = os.path.abspath(directory)
+
+        # The directory the actual repo will be stored in.
+        #
+        # Unless this gets more complicated, just use this directly
+        # in tests as a remote artifact push/pull configuration
+        #
+        self.repodir = os.path.join(self.directory, 'repo')
+
+        os.makedirs(self.repodir)
+
+        self.cas = CASCache(self.repodir)
+
+        self.total_space = total_space
+        self.free_space = free_space
+
+        q = Queue()
+
+        self.process = Process(target=self.run, args=(q,))
+        self.process.start()
+
+        # Retrieve port from server subprocess
+        port = q.get(timeout=1)
+
+        self.repo = 'http://localhost:{}'.format(port)
+
+    # run():
+    #
+    # Run the artifact server.
+    #
+    def run(self, q):
+        pytest_cov.embed.cleanup_on_sigterm()
+
+        # Optionally mock statvfs
+        if self.total_space:
+            if self.free_space is None:
+                self.free_space = self.total_space
+            os.statvfs = self._mock_statvfs
+
+        server = create_timeout_server(self.repodir, enable_push=True)
+        port = server.add_insecure_port('localhost:0')
+
+        server.start()
+
+        # Send port to parent
+        q.put(port)
+
+        # Sleep until termination by signal
+        signal.pause()
+
+    # has_object():
+    #
+    # Checks whether the object is present in the share
+    #
+    # Args:
+    #    digest (str): The object's digest
+    #
+    # Returns:
+    #    (bool): True if the object exists in the share, otherwise false.
+    def has_object(self, digest):
+
+        assert isinstance(digest, remote_execution_pb2.Digest)
+
+        object_path = self.cas.objpath(digest)
+
+        return os.path.exists(object_path)
+
+    # has_artifact():
+    #
+    # Checks whether the artifact is present in the share
+    #
+    # Args:
+    #    project_name (str): The project name
+    #    element_name (str): The element name
+    #    cache_key (str): The cache key
+    #
+    # Returns:
+    #    (str): artifact digest if the artifact exists in the share, otherwise None.
+    def has_artifact(self, project_name, element_name, cache_key):
+
+        # NOTE: This should be kept in line with our
+        #       artifact cache code, the below is the
+        #       same alI can confidently go for creating an artifact reference
+        #
+
+        # Replace path separator and chop off the .bst suffix
+        element_name = os.path.splitext(element_name.replace(os.sep, '-'))[0]
+
+        valid_chars = string.digits + string.ascii_letters + '-._'
+        element_name = ''.join([
+            x if x in valid_chars else '_'
+            for x in element_name
+        ])
+        artifact_key = '{0}/{1}/{2}'.format(project_name, element_name, cache_key)
+
+        try:
+            tree = self.cas.resolve_ref(artifact_key)
+            return tree
+        except CASError:
+            return None
+
+    # close():
+    #
+    # Remove the artifact share.
+    #
+    def close(self):
+        self.process.terminate()
+        self.process.join()
+
+        shutil.rmtree(self.directory)
+
+    def _mock_statvfs(self, path):
+        repo_size = 0
+        for root, _, files in os.walk(self.repodir):
+            for filename in files:
+                repo_size += os.path.getsize(os.path.join(root, filename))
+
+        return statvfs_result(f_blocks=self.total_space,
+                              f_bfree=self.free_space - repo_size,
+                              f_bavail=self.free_space - repo_size,
+                              f_bsize=1)
+
+
+# create_artifact_share()
+#
+# Create an ArtifactShare for use in a test case
+#
+@contextmanager
+def create_timeout_artifact_share(directory, *, total_space=None, free_space=None):
+    share = TimeoutArtifactShare(directory, total_space=total_space, free_space=free_space)
+    try:
+        yield share
+    finally:
+        share.close()
+
+
+statvfs_result = namedtuple('statvfs_result', 'f_blocks f_bfree f_bsize f_bavail')
+
+
+# create_timeout_server():
+#
+# Create gRPC CAS artifact server as specified in the Remote Execution API.
+#
+# Args:
+#     repo (str): Path to CAS repository
+#     enable_push (bool): Whether to allow blob uploads and artifact updates
+#
+def create_timeout_server(repo, *, enable_push):
+    cas = TimeoutCasCache(os.path.abspath(repo))
+
+    # Use max_workers default from Python 3.5+
+    max_workers = (os.cpu_count() or 1) * 5
+    server = grpc.server(futures.ThreadPoolExecutor(max_workers))
+
+    bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
+        _ByteStreamServicer(cas, enable_push=enable_push), server)
+
+    remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
+        _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
+
+    remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
+        _CapabilitiesServicer(), server)
+
+    buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
+        _ReferenceStorageServicer(cas, enable_push=enable_push), server)
+
+    return server
+
+
+class _ByteStreamServicer(casserver._ByteStreamServicer):
+    pass
+
+
+class _ContentAddressableStorageServicer(casserver._ContentAddressableStorageServicer):
+
+    def __init__(self, cas, *, enable_push):
+        self.__read_count = 0
+        super().__init__(cas=cas, enable_push=enable_push)
+
+    def BatchReadBlobs(self, request, context):
+        # self.__read_count += 1
+        # import time
+        # time.sleep(5)
+        return super().BatchReadBlobs(request, context)
+
+
+class _CapabilitiesServicer(casserver._CapabilitiesServicer):
+    pass
+
+
+class _ReferenceStorageServicer(casserver._ReferenceStorageServicer):
+    pass