You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:13:32 UTC

[buildstream] 11/43: Add code necessary to do cas-to-cas import

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch jmac/cas_to_cas_oct_v2
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 9a24fd164edd58967de0a7c17b6379f2c592a817
Author: Jim MacArthur <ji...@codethink.co.uk>
AuthorDate: Fri Oct 19 17:43:08 2018 +0100

    Add code necessary to do cas-to-cas import
---
 buildstream/storage/_casbaseddirectory.py | 246 ++++++++++++++++++++++++++++--
 tests/storage/virtual_directory_import.py |   3 +-
 2 files changed, 235 insertions(+), 14 deletions(-)

diff --git a/buildstream/storage/_casbaseddirectory.py b/buildstream/storage/_casbaseddirectory.py
index 286d672..69a3608 100644
--- a/buildstream/storage/_casbaseddirectory.py
+++ b/buildstream/storage/_casbaseddirectory.py
@@ -137,6 +137,41 @@ class CasBasedDirectory(Directory):
         # We don't need to do anything more than that; files were already added ealier, and symlinks are
         # part of the directory structure.
 
+    def _add_new_blank_directory(self, name) -> Directory:
+        bst_dir = CasBasedDirectory(self.context, parent=self, filename=name)
+        new_pb2_dirnode = self.pb2_directory.directories.add()
+        new_pb2_dirnode.name = name
+        # Calculate the hash for an empty directory
+        if name in self.index:
+            raise VirtualDirectoryError("Creating directory {} would overwrite an existing item in {}"
+                                        .format(name, str(self)))
+        new_pb2_directory = remote_execution_pb2.Directory()
+        self.cas_cache.add_object(digest=new_pb2_dirnode.digest, buffer=new_pb2_directory.SerializeToString())
+        self.index[name] = IndexEntry(new_pb2_dirnode, buildstream_object=bst_dir)
+        return bst_dir
+
+    def create_directory(self, name: str) -> Directory:
+        """Creates a directory if it does not already exist. This does not
+        cause an error if something exists; it will remove files and
+        symlinks to files which have the same name in this
+        directory. Symlinks to directories with the name 'name' are
+        unaltered; it's assumed that the target of that symlink will
+        be used.
+
+        """
+        existing_item = self._find_pb2_entry(name)
+        if isinstance(existing_item, remote_execution_pb2.FileNode):
+            # Directory imported over file with same name
+            self.remove_item(name)
+        elif isinstance(existing_item, remote_execution_pb2.SymlinkNode):
+            # Directory imported over symlink with same source name
+            if self.symlink_target_is_directory(existing_item):
+                return self._resolve_symlink_or_directory(name) # That's fine; any files in the source directory should end up at the target of the symlink.
+            else:
+                self.remove_item(name) # Symlinks to files get replaced
+        return self.descend(name, create=True) # Creates the directory if it doesn't already exist.
+
+
     def _find_pb2_entry(self, name):
         if name in self.index:
             return self.index[name].pb_object
@@ -233,6 +268,7 @@ class CasBasedDirectory(Directory):
             if isinstance(entry, CasBasedDirectory):
                 return entry.descend(subdirectory_spec[1:], create)
             else:
+                # May be a symlink
                 error = "Cannot descend into {}, which is a '{}' in the directory {}"
                 raise VirtualDirectoryError(error.format(subdirectory_spec[0],
                                                          type(entry).__name__,
@@ -289,6 +325,29 @@ class CasBasedDirectory(Directory):
                 directory = directory.descend(c, create=True)
         return directory
 
+    def _resolve_symlink(self, node):
+        """Same as _resolve_symlink_or_directory but takes a SymlinkNode.
+        """
+
+        # OK then, it's a symlink
+        symlink = node
+        absolute = symlink.target.startswith(CasBasedDirectory._pb2_absolute_path_prefix)
+        if absolute:
+            root = self.find_root()
+        else:
+            root = self
+        directory = root
+        components = symlink.target.split(CasBasedDirectory._pb2_path_sep)
+        for c in components:
+            if c == ".":
+                pass
+            elif c == "..":
+                directory = directory.parent
+            else:
+                directory = directory.descend(c, create=True)
+        return directory
+
+    
     def _resolve(self, name, absolute_symlinks_resolve=True):
         """ Resolves any name to an object. If the name points to a symlink in this 
         directory, it returns the thing it points to, recursively. Returns a CasBasedDirectory, FileNode or None. Never creates a directory or otherwise alters the directory. """
@@ -428,6 +487,157 @@ class CasBasedDirectory(Directory):
                     result.files_written.append(relative_pathname)
         return result
 
+
+    def _save(self, name):
+        """ Saves this directory into the content cache as a named ref. This function is not
+        currently in use, but may be useful later. """
+        self._recalculate_recursing_up()
+        self._recalculate_recursing_down()
+        (rel_refpath, refname) = os.path.split(name)
+        refdir = os.path.join(self.cas_directory, 'refs', 'heads', rel_refpath)
+        refname = os.path.join(refdir, refname)
+
+        if not os.path.exists(refdir):
+            os.makedirs(refdir)
+        with open(refname, "wb") as f:
+            f.write(self.ref.SerializeToString())
+
+    def find_updated_files(self, modified_directory, prefix=""):
+        """Find the list of written and overwritten files that would result
+        from importing 'modified_directory' into this one.  This does
+        not change either directory. The reason this exists is for
+        direct imports of cas directories into other ones, which can
+        be done by simply replacing a hash, but we still need the file
+        lists.
+
+        """
+        result = FileListResult()
+        for entry in modified_directory.pb2_directory.directories:
+            existing_dir = self._find_pb2_entry(entry.name)
+            if existing_dir:
+                updates_files = existing_dir.find_updated_files(modified_directory.descend(entry.name),
+                                                                os.path.join(prefix, entry.name))
+                result.combine(updated_files)
+            else:
+                for f in source_directory.descend(entry.name).list_relative_paths():
+                    result.files_written.append(os.path.join(prefix, f))
+                    # None of these can overwrite anything, since the original files don't exist
+        for entry in modified_directory.pb2_directory.files + modified_directory.pb2_directory.symlinks:
+            if self._find_pb2_entry(entry.name):
+                result.files_overwritten.apppend(os.path.join(prefix, entry.name))
+            result.file_written.apppend(os.path.join(prefix, entry.name))
+        return result
+
+    def files_in_subdir(sorted_files, dirname):
+        """Filters sorted_files and returns only the ones which have
+           'dirname' as a prefix, with that prefix removed.
+
+        """
+        if not dirname.endswith(os.path.sep):
+            dirname += os.path.sep
+        return [f[len(dirname):] for f in sorted_files if f.startswith(dirname)]
+
+    def symlink_target_is_directory(self, symlink_node):
+        x = self._resolve_symlink(symlink_node)
+        return isinstance(x, CasBasedDirectory)
+
+    def _partial_import_cas_into_cas(self, source_directory, files, path_prefix="", file_list_required=True):
+        """ Import only the files and symlinks listed in 'files' from source_directory to this one.
+        Args:
+           source_directory (:class:`.CasBasedDirectory`): The directory to import from
+           files ([str]): List of pathnames to import.
+           path_prefix (str): Prefix used to add entries to the file list result.
+           file_list_required: Whether to update the file list while processing.
+        """
+        print("Beginning partial import of {} into {}".format(source_directory, self))
+        result = FileListResult()
+        processed_directories = set()
+        for f in files:
+            if f == ".": continue
+            fullname = os.path.join(path_prefix, f)
+            components = f.split(os.path.sep)
+            if len(components)>1:
+                # We are importing a thing which is in a subdirectory. We may have already seen this dirname
+                # for a previous file.
+                dirname = components[0]
+                if dirname not in processed_directories:
+                    # Now strip off the first directory name and import files recursively.
+                    subcomponents = CasBasedDirectory.files_in_subdir(files, dirname)
+                    self.create_directory(dirname)
+                    print("Creating destination in {}: {}".format(self, dirname))
+                    dest_subdir = self._resolve_symlink_or_directory(dirname)
+                    src_subdir = source_directory.descend(dirname)
+                    import_result = dest_subdir._partial_import_cas_into_cas(src_subdir, subcomponents,
+                                                                             path_prefix=fullname, file_list_required=file_list_required)
+                    result.combine(import_result)
+                processed_directories.add(dirname)
+            elif isinstance(source_directory.index[f].buildstream_object, CasBasedDirectory):
+                # The thing in the input file list is a directory on its own. In which case, replace any existing file, or symlink to file
+                # with the new, blank directory - if it's neither of those things, or doesn't exist, then just create the dir.
+                self.create_directory(f)
+            else:
+                # We're importing a file or symlink - replace anything with the same name.
+                self._check_replacement(f, path_prefix, result)
+                item = source_directory.index[f].pb_object
+                if isinstance(item, remote_execution_pb2.FileNode):
+                    filenode = self.pb2_directory.files.add(digest=item.digest, name=f,
+                                                            is_executable=item.is_executable)
+                    self.index[f] = IndexEntry(filenode, modified=(fullname in result.overwritten))
+                else:
+                    assert(isinstance(item, remote_execution_pb2.SymlinkNode))
+                    symlinknode = self.pb2_directory.symlinks.add(name=f, target=item.target)
+                    # A symlink node has no digest.
+                    self.index[f] = IndexEntry(symlinknode, modified=(fullname in result.overwritten))
+        return result
+
+    def transfer_node_contents(destination, source):
+        """Transfers all fields from the source PB2 node into the
+        destination. Destination and source must be of the same type and must
+        be a FileNode, SymlinkNode or DirectoryNode.
+        """
+        assert(type(destination) == type(source))
+        destination.name = source.name
+        if isinstance(destination, remote_execution_pb2.FileNode):
+            destination.digest.hash = source.digest.hash
+            destination.digest.size_bytes = source.digest.size_bytes
+            destination.is_executable = source.is_executable
+        elif isinstance(destination, remote_execution_pb2.SymlinkNode):
+            destination.target = source.target
+        elif isinstance(destination, remote_execution_pb2.DirectoryNode):
+            destination.digest.hash = source.digest.hash
+            destination.digest.size_bytes = source.digest.size_bytes
+        else:
+            raise VirtualDirectoryError("Incompatible type '{}' used as destination for transfer_node_contents"
+                                        .format(destination.type))
+
+    def _add_directory_from_node(self, source_node, source_casdir, can_hardlink=False):
+        # Duplicate the given node and add it to our index with a CasBasedDirectory object.
+        # No existing entry with the source node's name can exist.
+        # source_casdir is only needed if can_hardlink is True.
+        assert(self._find_pb2_entry(source_node.name) is None)
+
+        if can_hardlink:
+            new_dir_node = self.pb2_directory.directories.add()
+            CasBasedDirectory.transfer_node_contents(new_dir_node, source_node)
+            self.index[source_node.name] = IndexEntry(source_node, buildstream_object=source_casdir, modified=True)
+        else:
+            new_dir_node = self.pb2_directory.directories.add()
+            CasBasedDirectory.transfer_node_contents(new_dir_node, source_node)
+            buildStreamDirectory = CasBasedDirectory(self.context, ref=source_node.digest,
+                                                     parent=self, filename=source_node.name)
+            self.index[source_node.name] = IndexEntry(source_node, buildstream_object=buildStreamDirectory, modified=True)
+
+    def _import_cas_into_cas(self, source_directory, files=None):
+        """ A full import is significantly quicker than a partial import, because we can just
+        replace one directory with another's hash, without doing any recursion.
+        """
+        if files is None:
+            #return self._full_import_cas_into_cas(source_directory, can_hardlink=True)
+            files = source_directory.list_relative_paths()
+            print("Extracted all files from source directory '{}': {}".format(source_directory, files))
+        return self._partial_import_cas_into_cas(source_directory, files)
+
+
     def import_files(self, external_pathspec, *, files=None,
                      report_written=True, update_utimes=False,
                      can_link=False):
@@ -449,28 +659,34 @@ class CasBasedDirectory(Directory):
 
         can_link (bool): Ignored, since hard links do not have any meaning within CAS.
         """
-        if isinstance(external_pathspec, FileBasedDirectory):
-            source_directory = external_pathspec._get_underlying_directory()
-        elif isinstance(external_pathspec, CasBasedDirectory):
-            # TODO: This transfers from one CAS to another via the
-            # filesystem, which is very inefficient. Alter this so it
-            # transfers refs across directly.
+
+        duplicate_cas = None
+        if isinstance(external_pathspec, CasBasedDirectory):
+            result = self._import_cas_into_cas(external_pathspec, files=files)
+
+            # Duplicate the current directory and do an import that way.
+            duplicate_cas = CasBasedDirectory(self.context, ref=self.ref)
             with tempfile.TemporaryDirectory(prefix="roundtrip") as tmpdir:
                 external_pathspec.export_files(tmpdir)
                 if files is None:
                     files = list_relative_paths(tmpdir)
-                result = self._import_files_from_directory(tmpdir, files=files)
-            return result
+                duplicate_cas._import_files_from_directory(tmpdir, files=files)
+                duplicate_cas._recalculate_recursing_down()
+                if duplicate_cas.parent:
+                    duplicate_cas.parent._recalculate_recursing_up(self)
         else:
-            source_directory = external_pathspec
-
-        if files is None:
-            files = list_relative_paths(source_directory)
+            if isinstance(external_pathspec, FileBasedDirectory):
+                source_directory = external_pathspec.get_underlying_directory()
+            else:
+                source_directory = external_pathspec
+            if files is None:
+                files = list_relative_paths(external_pathspec)
+            result = self._import_files_from_directory(source_directory, files=files)
 
         # TODO: No notice is taken of report_written, update_utimes or can_link.
         # Current behaviour is to fully populate the report, which is inefficient,
         # but still correct.
-        result = self._import_files_from_directory(source_directory, files=files)
+
 
         # We need to recalculate and store the hashes of all directories both
         # up and down the tree; we have changed our directory by importing files
@@ -480,6 +696,10 @@ class CasBasedDirectory(Directory):
         self._recalculate_recursing_down()
         if self.parent:
             self.parent._recalculate_recursing_up(self)
+        if duplicate_cas:
+            if duplicate_cas.ref.hash != self.ref.hash:
+                raise VirtualDirectoryError("Mismatch between file-imported result {} and cas-to-cas imported result {}.".format(duplicate_cas.ref.hash,self.ref.hash))
+
         return result
 
     def set_deterministic_mtime(self):
diff --git a/tests/storage/virtual_directory_import.py b/tests/storage/virtual_directory_import.py
index 1c78c1b..47b4935 100644
--- a/tests/storage/virtual_directory_import.py
+++ b/tests/storage/virtual_directory_import.py
@@ -150,9 +150,10 @@ def test_cas_import(cli, tmpdir, original, overlay):
     generate_random_root(tmpdir)
     d = create_new_casdir(original, fake_context, tmpdir)
     d2 = create_new_casdir(overlay, fake_context, tmpdir)
+    print("Importing dir {} into {}".format(overlay, original))
     d.import_files(d2)
     d.export_files(os.path.join(tmpdir, "output"))
-
+    
     for item in root_filesets[overlay - 1]:
         (path, typename, content) = item
         realpath = resolve_symlinks(path, os.path.join(tmpdir, "output"))