You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:30:10 UTC

[buildstream] 02/21: WIP: pickle things

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch aevri/win32
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 05893e1dad07927302974112ec904805029b50cb
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Jun 18 15:42:59 2019 +0100

    WIP: pickle things
---
 src/buildstream/_artifactcache.py      |  6 +++
 src/buildstream/_context.py            |  5 +++
 src/buildstream/_elementfactory.py     |  7 ++-
 src/buildstream/_options/optionpool.py | 12 ++++++
 src/buildstream/_plugincontext.py      | 26 +++++++++--
 src/buildstream/_project.py            | 18 +++++++-
 src/buildstream/_scheduler/jobs/job.py | 79 +++++++++++++++++++++++++++++++++-
 src/buildstream/_sourcecache.py        |  6 +++
 src/buildstream/_sourcefactory.py      |  5 ++-
 src/buildstream/element.py             |  1 +
 src/buildstream/source.py              |  2 +
 11 files changed, 156 insertions(+), 11 deletions(-)

diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py
index de17ea7..a4497ba 100644
--- a/src/buildstream/_artifactcache.py
+++ b/src/buildstream/_artifactcache.py
@@ -103,6 +103,12 @@ class ArtifactCache(BaseCache):
         self.cas.add_reachable_directories_callback(self._reachable_directories)
         self.cas.add_reachable_digests_callback(self._reachable_digests)
 
+    def __getstate__(self):
+        state = self.__dict__.copy()
+        # TODO: actually pickle the elements, resolving to the same objects.
+        state['_required_elements'] = set()
+        return state
+
     # mark_required_elements():
     #
     # Mark elements whose artifacts are required for the current run.
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index 3f6e6ac..d9ef1f1 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -168,6 +168,11 @@ class Context():
         self._casquota = None
         self._directory = directory
 
+    def __getstate__(self):
+        state = self.__dict__.copy()
+        del state['_message_handler']
+        return state
+
     # load()
     #
     # Loads the configuration files
diff --git a/src/buildstream/_elementfactory.py b/src/buildstream/_elementfactory.py
index d6591bf..b2a7f73 100644
--- a/src/buildstream/_elementfactory.py
+++ b/src/buildstream/_elementfactory.py
@@ -33,9 +33,12 @@ class ElementFactory(PluginContext):
 
     def __init__(self, plugin_base, *,
                  format_versions={},
-                 plugin_origins=None):
+                 plugin_origins=None,
+                 pass_=None):
 
-        super().__init__(plugin_base, Element, [_site.element_plugins],
+        assert pass_ is not None
+
+        super().__init__(plugin_base, Element, [_site.element_plugins], 'element' + str(pass_),
                          plugin_origins=plugin_origins,
                          format_versions=format_versions)
 
diff --git a/src/buildstream/_options/optionpool.py b/src/buildstream/_options/optionpool.py
index de3af3e..d5dea39 100644
--- a/src/buildstream/_options/optionpool.py
+++ b/src/buildstream/_options/optionpool.py
@@ -56,6 +56,18 @@ class OptionPool():
         self._environment = jinja2.Environment(undefined=jinja2.StrictUndefined)
         self._environment.globals = []
 
+    def __getstate__(self):
+        state = self.__dict__.copy()
+        del state['_environment']
+        return state
+
+    def __setstate__(self, state):
+        self.__dict__.update(state)
+
+        # jinja2 environment, with default globals cleared out of the way
+        self._environment = jinja2.Environment(undefined=jinja2.StrictUndefined)
+        self._environment.globals = []
+
     # load()
     #
     # Loads the options described in the project.conf
diff --git a/src/buildstream/_plugincontext.py b/src/buildstream/_plugincontext.py
index 7a5407c..0d322c1 100644
--- a/src/buildstream/_plugincontext.py
+++ b/src/buildstream/_plugincontext.py
@@ -42,10 +42,12 @@ from . import _yaml
 #
 class PluginContext():
 
-    def __init__(self, plugin_base, base_type, site_plugin_path, *,
+    def __init__(self, plugin_base, base_type, site_plugin_path, identifier, *,
                  plugin_origins=None, dependencies=None,
                  format_versions={}):
 
+        self._identifier = identifier
+
         # The plugin kinds which were loaded
         self.loaded_dependencies = []
 
@@ -59,10 +61,26 @@ class PluginContext():
 
         # The PluginSource object
         self._plugin_base = plugin_base
-        self._site_source = plugin_base.make_plugin_source(searchpath=site_plugin_path)
+        self._site_plugin_path = site_plugin_path
+        self._site_source = plugin_base.make_plugin_source(
+            searchpath=self._site_plugin_path,
+            identifier='site_plugin-' + self._identifier)
         self._alternate_sources = {}
         self._format_versions = format_versions
 
+    def __getstate__(self):
+        import copy
+        state = copy.copy(self.__dict__)
+        del state['_site_source']
+        state['_types'] = {}
+        return state
+
+    def __setstate__(self, state):
+        self.__dict__ = state
+        self._site_source = self._plugin_base.make_plugin_source(
+            searchpath=self._site_plugin_path,
+            identifier='site_plugin-' + self._identifier)
+
     # lookup():
     #
     # Fetches a type loaded from a plugin in this plugin context
@@ -80,7 +98,7 @@ class PluginContext():
     def _get_local_plugin_source(self, path):
         if ('local', path) not in self._alternate_sources:
             # key by a tuple to avoid collision
-            source = self._plugin_base.make_plugin_source(searchpath=[path])
+            source = self._plugin_base.make_plugin_source(searchpath=[path], identifier='local_plugin-' + path + '-' + self._identifier)
             # Ensure that sources never get garbage collected,
             # as they'll take the plugins with them.
             self._alternate_sources[('local', path)] = source
@@ -121,7 +139,7 @@ class PluginContext():
                 # The plugin didn't have an accompanying YAML file
                 defaults = None
 
-            source = self._plugin_base.make_plugin_source(searchpath=[os.path.dirname(location)])
+            source = self._plugin_base.make_plugin_source(searchpath=[os.path.dirname(location)], identifier='pip_plugin-' + self._identifier)
             self._alternate_sources[('pip', package_name)] = source
 
         else:
diff --git a/src/buildstream/_project.py b/src/buildstream/_project.py
index 1fdc84a..a2df5bce 100644
--- a/src/buildstream/_project.py
+++ b/src/buildstream/_project.py
@@ -86,6 +86,12 @@ class ProjectConfig:
         self.default_mirror = None               # The name of the preferred mirror.
         self._aliases = {}                       # Aliases dictionary
 
+    def __getstate__(self):
+        state = self.__dict__.copy()
+        del state["element_factory"]
+        del state["source_factory"]
+        return state
+
 
 # Project()
 #
@@ -97,6 +103,8 @@ class Project():
                  default_mirror=None, parent_loader=None,
                  search_for_project=True):
 
+        self._pass = None
+
         # The project name
         self.name = None
 
@@ -622,6 +630,8 @@ class Project():
         config_no_include = _yaml.node_copy(self._default_config_node)
         _yaml.composite(config_no_include, project_conf_first_pass)
 
+        assert self._pass is None
+        self._pass = 1
         self._load_pass(config_no_include, self.first_pass_config,
                         ignore_unknown=True)
 
@@ -646,6 +656,8 @@ class Project():
         config = _yaml.node_copy(self._default_config_node)
         _yaml.composite(config, project_conf_second_pass)
 
+        assert self._pass == 1
+        self._pass = 2
         self._load_pass(config, self.config)
 
         self._validate_node(config)
@@ -919,10 +931,12 @@ class Project():
         pluginbase = PluginBase(package='buildstream.plugins')
         output.element_factory = ElementFactory(pluginbase,
                                                 plugin_origins=plugin_element_origins,
-                                                format_versions=element_format_versions)
+                                                format_versions=element_format_versions,
+                                                pass_=self._pass)
         output.source_factory = SourceFactory(pluginbase,
                                               plugin_origins=plugin_source_origins,
-                                              format_versions=source_format_versions)
+                                              format_versions=source_format_versions,
+                                              pass_=self._pass)
 
     # _store_origin()
     #
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index e11a9f9..4ca2ee4 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -21,7 +21,10 @@
 
 # System imports
 import enum
+import copyreg
+import io
 import os
+import pickle
 import sys
 import signal
 import datetime
@@ -32,7 +35,7 @@ import multiprocessing
 # BuildStream toplevel imports
 from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
 from ..._message import Message, MessageType, unconditional_messages
-from ... import _signals, utils
+from ... import _signals, utils, Plugin, Element, Source
 
 
 # Return code values shutdown of job handling child processes
@@ -87,6 +90,80 @@ class _MessageType(enum.Enum):
     SUBCLASS_CUSTOM_MESSAGE = 5
 
 
+def _reduce_element(element):
+    assert isinstance(element, Element)
+    meta_kind = element._meta_kind
+    project = element._get_project()
+    factory = project.config.element_factory
+    args = (factory, meta_kind)
+    state = element.__dict__.copy()
+    del state["_Element__reverse_dependencies"]
+    return (_unreduce_plugin, args, state)
+
+
+def _reduce_source(source):
+    assert isinstance(source, Source)
+    meta_kind = source._meta_kind
+    project = source._get_project()
+    factory = project.config.source_factory
+    args = (factory, meta_kind)
+    return (_unreduce_plugin, args, source.__dict__.copy())
+
+
+def _unreduce_plugin(factory, meta_kind):
+    cls, _ = factory.lookup(meta_kind)
+    plugin = cls.__new__(cls)
+
+    # TODO: find a better way of persisting this factory, otherwise the plugin
+    # will become invalid.
+    plugin.factory = factory
+
+    return plugin
+
+
+def _pickle_child_job(child_job, context):
+
+    # Note: Another way of doing this would be to let PluginBase do it's
+    # import-magic. We would achieve this by first pickling the factories, and
+    # the string names of their plugins. Unpickling the plugins in the child
+    # process would then "just work". There would be an additional cost of
+    # having to load every plugin kind, regardless of which ones are used.
+
+    projects = context.get_projects()
+    element_classes = [
+        cls
+        for p in projects
+        for cls, _ in p.config.element_factory._types.values()
+    ]
+    source_classes = [
+        cls
+        for p in projects
+        for cls, _ in p.config.source_factory._types.values()
+    ]
+
+    data = io.BytesIO()
+    pickler = pickle.Pickler(data)
+    pickler.dispatch_table = copyreg.dispatch_table.copy()
+    for cls in element_classes:
+        pickler.dispatch_table[cls] = _reduce_element
+    for cls in source_classes:
+        pickler.dispatch_table[cls] = _reduce_source
+    pickler.dump(child_job)
+    data.seek(0)
+
+    return data
+
+
+def _unpickle_child_job(pickled):
+    child_job = pickle.load(pickled)
+    return child_job
+
+
+def _do_pickled_child_job(pickled, *child_args):
+    child_job = _unpickle_child_job(pickled)
+    return child_job.child_action(*child_args)
+
+
 # Job()
 #
 # The Job object represents a task that will run in parallel to the main
diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py
index ce0694e..96d874b 100644
--- a/src/buildstream/_sourcecache.py
+++ b/src/buildstream/_sourcecache.py
@@ -58,6 +58,12 @@ class SourceCache(BaseCache):
         self.casquota.add_remove_callbacks(self.unrequired_sources, self.cas.remove)
         self.casquota.add_list_refs_callback(self.list_sources)
 
+    def __getstate__(self):
+        state = self.__dict__.copy()
+        # TODO: actually pickle the sources, resolving to the same objects.
+        state['_required_sources'] = set()
+        return state
+
     # mark_required_sources()
     #
     # Mark sources that are required by the current run.
diff --git a/src/buildstream/_sourcefactory.py b/src/buildstream/_sourcefactory.py
index 1d959a1..eca4b50 100644
--- a/src/buildstream/_sourcefactory.py
+++ b/src/buildstream/_sourcefactory.py
@@ -33,9 +33,10 @@ class SourceFactory(PluginContext):
 
     def __init__(self, plugin_base, *,
                  format_versions={},
-                 plugin_origins=None):
+                 plugin_origins=None,
+                 pass_=None):
 
-        super().__init__(plugin_base, Source, [_site.source_plugins],
+        super().__init__(plugin_base, Source, [_site.source_plugins], 'source' + str(pass_),
                          format_versions=format_versions,
                          plugin_origins=plugin_origins)
 
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index a605460..359e183 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -186,6 +186,7 @@ class Element(Plugin):
     """
 
     def __init__(self, context, project, meta, plugin_conf):
+        self._meta_kind = meta.kind
 
         self.__cache_key_dict = None            # Dict for cache key calculation
         self.__cache_key = None                 # Our cached cache key
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index 9fc9cf1..90db130 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -312,6 +312,8 @@ class Source(Plugin):
         super().__init__("{}-{}".format(meta.element_name, meta.element_index),
                          context, project, provenance, "source", unique_id=unique_id)
 
+        self._meta_kind = meta.kind
+
         self.__source_cache = context.sourcecache
 
         self.__element_name = meta.element_name         # The name of the element owning this source