You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:31:18 UTC

[buildstream] 02/02: Deduplicate files in local cache with or without exec rights

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch valentindavid/local-cache-exec-leak-2
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit da80724e93acf662fb1b58b9f34918b91911437a
Author: Valentin David <va...@codethink.co.uk>
AuthorDate: Tue Feb 12 18:38:55 2019 +0100

    Deduplicate files in local cache with or without exec rights
    
    If we introduce an exact same object with execution rights as existing
    file without execution right, we should not expect that the files
    suddenly get execution rights. This breaks reproducibility and it is
    easy to encounter. For example install an empty file with execution
    rights. Or copy files from another artifact and `chmod +x` it.
---
 buildstream/_cas/cascache.py     | 87 ++++++++++++++++++++++------------------
 buildstream/_cas/casremote.py    |  9 ++++-
 buildstream/_cas/casserver.py    |  2 +-
 tests/frontend/buildcheckout.py  | 33 +++++++++++++++
 tests/testutils/artifactshare.py |  2 +-
 5 files changed, 91 insertions(+), 42 deletions(-)

diff --git a/buildstream/_cas/cascache.py b/buildstream/_cas/cascache.py
index 7c7af40..531de3f 100644
--- a/buildstream/_cas/cascache.py
+++ b/buildstream/_cas/cascache.py
@@ -43,9 +43,10 @@ from .casremote import BlobNotFound, _CASBatchRead, _CASBatchUpdate
 #
 class CASCache():
 
-    def __init__(self, path):
+    def __init__(self, path, *, disable_exec=False):
         self.casdir = os.path.join(path, 'cas')
         self.tmpdir = os.path.join(path, 'tmp')
+        self._disable_exec = disable_exec
         os.makedirs(os.path.join(self.casdir, 'refs', 'heads'), exist_ok=True)
         os.makedirs(os.path.join(self.casdir, 'objects'), exist_ok=True)
         os.makedirs(self.tmpdir, exist_ok=True)
@@ -340,8 +341,12 @@ class CASCache():
     # Returns:
     #     (str): The path of the object
     #
-    def objpath(self, digest):
-        return os.path.join(self.casdir, 'objects', digest.hash[:2], digest.hash[2:])
+    def objpath(self, digest, *, is_exec=False):
+        if is_exec and not self._disable_exec:
+            filename = '{}.exec'.format(digest.hash[2:])
+        else:
+            filename = digest.hash[2:]
+        return os.path.join(self.casdir, 'objects', digest.hash[:2], filename)
 
     # add_object():
     #
@@ -358,7 +363,7 @@ class CASCache():
     #
     # Either `path` or `buffer` must be passed, but not both.
     #
-    def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False):
+    def add_object(self, *, digest=None, path=None, buffer=None, link_directly=False, is_exec=False):
         # Exactly one of the two parameters has to be specified
         assert (path is None) != (buffer is None)
 
@@ -374,8 +379,7 @@ class CASCache():
                     for chunk in iter(lambda: tmp.read(4096), b""):
                         h.update(chunk)
                 else:
-                    tmp = stack.enter_context(self._temporary_object())
-
+                    tmp = stack.enter_context(self._temporary_object(is_exec=is_exec))
                     if path:
                         with open(path, 'rb') as f:
                             for chunk in iter(lambda: f.read(4096), b""):
@@ -391,7 +395,7 @@ class CASCache():
                 digest.size_bytes = os.fstat(tmp.fileno()).st_size
 
                 # Place file at final location
-                objpath = self.objpath(digest)
+                objpath = self.objpath(digest, is_exec=is_exec)
                 os.makedirs(os.path.dirname(objpath), exist_ok=True)
                 os.link(tmp.name, objpath)
 
@@ -600,11 +604,7 @@ class CASCache():
         for filenode in directory.files:
             # regular file, create hardlink
             fullpath = os.path.join(dest, filenode.name)
-            os.link(self.objpath(filenode.digest), fullpath)
-
-            if filenode.is_executable:
-                os.chmod(fullpath, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR |
-                         stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
+            os.link(self.objpath(filenode.digest, is_exec=filenode.is_executable), fullpath)
 
         for dirnode in directory.directories:
             # Don't try to checkout a dangling ref
@@ -696,8 +696,8 @@ class CASCache():
             elif stat.S_ISREG(mode):
                 filenode = directory.files.add()
                 filenode.name = name
-                self.add_object(path=full_path, digest=filenode.digest)
                 filenode.is_executable = (mode & stat.S_IXUSR) == stat.S_IXUSR
+                self.add_object(path=full_path, digest=filenode.digest, is_exec=filenode.is_executable)
             elif stat.S_ISLNK(mode):
                 symlinknode = directory.symlinks.add()
                 symlinknode.name = name
@@ -796,7 +796,7 @@ class CASCache():
 
         for filenode in directory.files:
             if update_mtime:
-                os.utime(self.objpath(filenode.digest))
+                os.utime(self.objpath(filenode.digest, is_exec=filenode.is_executable))
             reachable.add(filenode.digest.hash)
 
         for dirnode in directory.directories:
@@ -807,7 +807,7 @@ class CASCache():
         d = remote_execution_pb2.Digest()
         d.hash = directory_digest.hash
         d.size_bytes = directory_digest.size_bytes
-        yield d
+        yield False, d
 
         directory = remote_execution_pb2.Directory()
 
@@ -818,7 +818,7 @@ class CASCache():
             d = remote_execution_pb2.Digest()
             d.hash = filenode.digest.hash
             d.size_bytes = filenode.digest.size_bytes
-            yield d
+            yield filenode.is_executable, d
 
         for dirnode in directory.directories:
             yield from self._required_blobs(dirnode.digest)
@@ -830,10 +830,12 @@ class CASCache():
     #
     # Create a named temporary file with 0o0644 access rights.
     @contextlib.contextmanager
-    def _temporary_object(self):
+    def _temporary_object(self, *, is_exec=False):
         with utils._tempnamedfile(dir=self.tmpdir) as f:
-            os.chmod(f.name,
-                     stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
+            access = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
+            if is_exec and not self._disable_exec:
+                access |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
+            os.chmod(f.name, access)
             yield f
 
     # _ensure_blob():
@@ -847,27 +849,27 @@ class CASCache():
     # Returns:
     #     (str): The path of the object
     #
-    def _ensure_blob(self, remote, digest):
-        objpath = self.objpath(digest)
+    def _ensure_blob(self, remote, digest, is_exec=False):
+        objpath = self.objpath(digest, is_exec=is_exec)
         if os.path.exists(objpath):
             # already in local repository
             return objpath
 
-        with self._temporary_object() as f:
+        with self._temporary_object(is_exec=is_exec) as f:
             remote._fetch_blob(digest, f)
 
-            added_digest = self.add_object(path=f.name, link_directly=True)
+            added_digest = self.add_object(path=f.name, link_directly=True, is_exec=is_exec)
             assert added_digest.hash == digest.hash
 
         return objpath
 
     def _batch_download_complete(self, batch):
-        for digest, data in batch.send():
-            with self._temporary_object() as f:
+        for digest, data, is_exec in batch.send():
+            with self._temporary_object(is_exec=is_exec) as f:
                 f.write(data)
                 f.flush()
 
-                added_digest = self.add_object(path=f.name, link_directly=True)
+                added_digest = self.add_object(path=f.name, link_directly=True, is_exec=is_exec)
                 assert added_digest.hash == digest.hash
 
     # Helper function for _fetch_directory().
@@ -881,8 +883,9 @@ class CASCache():
         return _CASBatchRead(remote)
 
     # Helper function for _fetch_directory().
-    def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue, *, recursive=False):
-        in_local_cache = os.path.exists(self.objpath(digest))
+    def _fetch_directory_node(self, remote, digest, batch, fetch_queue, fetch_next_queue,
+                              *, recursive=False, is_exec=False):
+        in_local_cache = os.path.exists(self.objpath(digest, is_exec=is_exec))
 
         if in_local_cache:
             # Skip download, already in local cache.
@@ -890,14 +893,14 @@ class CASCache():
         elif (digest.size_bytes >= remote.max_batch_total_size_bytes or
               not remote.batch_read_supported):
             # Too large for batch request, download in independent request.
-            self._ensure_blob(remote, digest)
+            self._ensure_blob(remote, digest, is_exec=is_exec)
             in_local_cache = True
         else:
-            if not batch.add(digest):
+            if not batch.add(digest, is_exec=is_exec):
                 # Not enough space left in batch request.
                 # Complete pending batch first.
                 batch = self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
-                batch.add(digest)
+                batch.add(digest, is_exec=is_exec)
 
         if recursive:
             if in_local_cache:
@@ -945,11 +948,13 @@ class CASCache():
             for dirnode in directory.directories:
                 if dirnode.name not in excluded_subdirs:
                     batch = self._fetch_directory_node(remote, dirnode.digest, batch,
-                                                       fetch_queue, fetch_next_queue, recursive=True)
+                                                       fetch_queue, fetch_next_queue,
+                                                       recursive=True)
 
             for filenode in directory.files:
                 batch = self._fetch_directory_node(remote, filenode.digest, batch,
-                                                   fetch_queue, fetch_next_queue)
+                                                   fetch_queue, fetch_next_queue,
+                                                   is_exec=filenode.is_executable)
 
         # Fetch final batch
         self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
@@ -971,7 +976,7 @@ class CASCache():
             tree.children.extend([tree.root])
             for directory in tree.children:
                 for filenode in directory.files:
-                    self._ensure_blob(remote, filenode.digest)
+                    self._ensure_blob(remote, filenode.digest, is_exec=filenode.is_executable)
 
                 # place directory blob only in final location when we've downloaded
                 # all referenced blobs to avoid dangling references in the repository
@@ -984,22 +989,28 @@ class CASCache():
     def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
         required_blobs = self._required_blobs(digest)
 
+        executable = {}
+
         missing_blobs = dict()
         # Limit size of FindMissingBlobs request
         for required_blobs_group in _grouper(required_blobs, 512):
             request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
 
-            for required_digest in required_blobs_group:
+            for is_exec, required_digest in required_blobs_group:
                 d = request.blob_digests.add()
                 d.hash = required_digest.hash
                 d.size_bytes = required_digest.size_bytes
+                if required_digest.hash not in executable:
+                    executable[required_digest.hash] = set()
+                executable[required_digest.hash].add(is_exec)
 
             response = remote.cas.FindMissingBlobs(request)
             for missing_digest in response.missing_blob_digests:
                 d = remote_execution_pb2.Digest()
                 d.hash = missing_digest.hash
                 d.size_bytes = missing_digest.size_bytes
-                missing_blobs[d.hash] = d
+                for is_exec in executable[missing_digest.hash]:
+                    missing_blobs[d.hash] = (is_exec, d)
 
         # Upload any blobs missing on the server
         self._send_blobs(remote, missing_blobs.values(), u_uid)
@@ -1007,8 +1018,8 @@ class CASCache():
     def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
         batch = _CASBatchUpdate(remote)
 
-        for digest in digests:
-            with open(self.objpath(digest), 'rb') as f:
+        for is_exec, digest in digests:
+            with open(self.objpath(digest, is_exec=is_exec), 'rb') as f:
                 assert os.fstat(f.fileno()).st_size == digest.size_bytes
 
                 if (digest.size_bytes >= remote.max_batch_total_size_bytes or
diff --git a/buildstream/_cas/casremote.py b/buildstream/_cas/casremote.py
index 56ba4c5..0acd465 100644
--- a/buildstream/_cas/casremote.py
+++ b/buildstream/_cas/casremote.py
@@ -306,8 +306,9 @@ class _CASBatchRead():
         self._request = remote_execution_pb2.BatchReadBlobsRequest()
         self._size = 0
         self._sent = False
+        self._is_exec = {}
 
-    def add(self, digest):
+    def add(self, digest, *, is_exec=False):
         assert not self._sent
 
         new_batch_size = self._size + digest.size_bytes
@@ -319,6 +320,9 @@ class _CASBatchRead():
         request_digest.hash = digest.hash
         request_digest.size_bytes = digest.size_bytes
         self._size = new_batch_size
+        if digest.hash not in self._is_exec:
+            self._is_exec[digest.hash] = set()
+        self._is_exec[digest.hash].add(is_exec)
         return True
 
     def send(self):
@@ -341,7 +345,8 @@ class _CASBatchRead():
                 raise CASRemoteError("Failed to download blob {}: expected {} bytes, received {} bytes".format(
                     response.digest.hash, response.digest.size_bytes, len(response.data)))
 
-            yield (response.digest, response.data)
+            for is_exec in self._is_exec[response.digest.hash]:
+                yield (response.digest, response.data, is_exec)
 
 
 # Represents a batch of blobs queued for upload.
diff --git a/buildstream/_cas/casserver.py b/buildstream/_cas/casserver.py
index 5482dae..41baede 100644
--- a/buildstream/_cas/casserver.py
+++ b/buildstream/_cas/casserver.py
@@ -61,7 +61,7 @@ class ArtifactTooLargeException(Exception):
 def create_server(repo, *, enable_push,
                   max_head_size=int(10e9),
                   min_head_size=int(2e9)):
-    cas = CASCache(os.path.abspath(repo))
+    cas = CASCache(os.path.abspath(repo), disable_exec=True)
 
     # Use max_workers default from Python 3.5+
     max_workers = (os.cpu_count() or 1) * 5
diff --git a/tests/frontend/buildcheckout.py b/tests/frontend/buildcheckout.py
index 80d710f..4875f02 100644
--- a/tests/frontend/buildcheckout.py
+++ b/tests/frontend/buildcheckout.py
@@ -2,6 +2,8 @@ import os
 import tarfile
 import hashlib
 import pytest
+import shutil
+import stat
 import subprocess
 from tests.testutils.site import IS_WINDOWS
 from tests.testutils import create_repo, ALL_REPO_KINDS, generate_junction
@@ -709,3 +711,34 @@ def test_build_checkout_cross_junction(datafiles, cli, tmpdir):
 
     filename = os.path.join(checkout, 'etc', 'animal.conf')
     assert os.path.exists(filename)
+
+
+@pytest.mark.datafiles(DATA_DIR)
+def test_access_rights(datafiles, cli):
+    project = str(datafiles)
+    checkout = os.path.join(cli.directory, 'checkout')
+
+    shutil.copyfile(os.path.join(project, 'files', 'bin-files', 'usr', 'bin', 'hello'),
+                    os.path.join(project, 'files', 'bin-files', 'usr', 'bin', 'hello-2'))
+    os.chmod(os.path.join(project, 'files', 'bin-files', 'usr', 'bin', 'hello'),
+             0o0755)
+    os.chmod(os.path.join(project, 'files', 'bin-files', 'usr', 'bin', 'hello-2'),
+             0o0644)
+
+    result = cli.run(project=project, args=['build', 'target.bst'])
+    result.assert_success()
+
+    checkout_args = ['artifact', 'checkout', 'target.bst',
+                     '--directory', checkout]
+
+    # Now check it out
+    result = cli.run(project=project, args=checkout_args)
+    result.assert_success()
+
+    st = os.lstat(os.path.join(checkout, 'usr', 'bin', 'hello'))
+    assert stat.S_ISREG(st.st_mode)
+    assert stat.S_IMODE(st.st_mode) == 0o0755
+
+    st = os.lstat(os.path.join(checkout, 'usr', 'bin', 'hello-2'))
+    assert stat.S_ISREG(st.st_mode)
+    assert stat.S_IMODE(st.st_mode) == 0o0644
diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index 6b03d8d..e8ed99b 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -49,7 +49,7 @@ class ArtifactShare():
 
         os.makedirs(self.repodir)
 
-        self.cas = CASCache(self.repodir)
+        self.cas = CASCache(self.repodir, disable_exec=True)
 
         self.total_space = total_space
         self.free_space = free_space