You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:32:52 UTC

[buildstream] 02/22: WIP: pickleable jobs

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch aevri/win32_minimal_seemstowork_20190829
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 43021072636526991f89edf336e21fd658c47452
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Wed Jul 10 09:24:30 2019 +0100

    WIP: pickleable jobs
    
    WIP: make some things pickleable
    
    WIP: Make ElementFactory and SourceFactory picklable
    
    WIP: job: pickling machinery for jobs
    
    WIP: loader: don't pickle _fetch_subprojects
    
    Note that a new circular dependency was introduced in:
    
        737aea18 Handle subproject fetching in the Stream class
    
    We can avoid the complications by not pickling it out to the child job.
    
    WIP: move reducers to jobpickler
    
    WIP: jobpickler: use proto serialization instead of pickle
    
    WIP: jobpickler: pickle less
    
    WIP: job: pickle apologies
---
 src/buildstream/_elementfactory.py            |   7 +-
 src/buildstream/_plugincontext.py             |  26 ++++-
 src/buildstream/_project.py                   |  12 +-
 src/buildstream/_scheduler/jobs/jobpickler.py | 151 ++++++++++++++++++++++++++
 src/buildstream/_sourcefactory.py             |   5 +-
 src/buildstream/element.py                    |   1 +
 src/buildstream/source.py                     |   2 +
 7 files changed, 194 insertions(+), 10 deletions(-)

diff --git a/src/buildstream/_elementfactory.py b/src/buildstream/_elementfactory.py
index d6591bf..b2a7f73 100644
--- a/src/buildstream/_elementfactory.py
+++ b/src/buildstream/_elementfactory.py
@@ -33,9 +33,12 @@ class ElementFactory(PluginContext):
 
     def __init__(self, plugin_base, *,
                  format_versions={},
-                 plugin_origins=None):
+                 plugin_origins=None,
+                 pass_=None):
 
-        super().__init__(plugin_base, Element, [_site.element_plugins],
+        assert pass_ is not None
+
+        super().__init__(plugin_base, Element, [_site.element_plugins], 'element' + str(pass_),
                          plugin_origins=plugin_origins,
                          format_versions=format_versions)
 
diff --git a/src/buildstream/_plugincontext.py b/src/buildstream/_plugincontext.py
index 7fef9b9..fa07e7b 100644
--- a/src/buildstream/_plugincontext.py
+++ b/src/buildstream/_plugincontext.py
@@ -44,9 +44,11 @@ from . import _yaml
 #
 class PluginContext():
 
-    def __init__(self, plugin_base, base_type, site_plugin_path, *,
+    def __init__(self, plugin_base, base_type, site_plugin_path, identifier, *,
                  plugin_origins=None, format_versions={}):
 
+        self._identifier = identifier
+
         # The plugin kinds which were loaded
         self.loaded_dependencies = []
 
@@ -59,10 +61,26 @@ class PluginContext():
 
         # The PluginSource object
         self._plugin_base = plugin_base
-        self._site_source = plugin_base.make_plugin_source(searchpath=site_plugin_path)
+        self._site_plugin_path = site_plugin_path
+        self._site_source = plugin_base.make_plugin_source(
+            searchpath=self._site_plugin_path,
+            identifier='site_plugin-' + self._identifier)
         self._alternate_sources = {}
         self._format_versions = format_versions
 
+    def __getstate__(self):
+        import copy
+        state = copy.copy(self.__dict__)
+        del state['_site_source']
+        state['_types'] = {}
+        return state
+
+    def __setstate__(self, state):
+        self.__dict__ = state
+        self._site_source = self._plugin_base.make_plugin_source(
+            searchpath=self._site_plugin_path,
+            identifier='site_plugin-' + self._identifier)
+
     # lookup():
     #
     # Fetches a type loaded from a plugin in this plugin context
@@ -80,7 +98,7 @@ class PluginContext():
     def _get_local_plugin_source(self, path):
         if ('local', path) not in self._alternate_sources:
             # key by a tuple to avoid collision
-            source = self._plugin_base.make_plugin_source(searchpath=[path])
+            source = self._plugin_base.make_plugin_source(searchpath=[path], identifier='local_plugin-' + path + '-' + self._identifier)
             # Ensure that sources never get garbage collected,
             # as they'll take the plugins with them.
             self._alternate_sources[('local', path)] = source
@@ -121,7 +139,7 @@ class PluginContext():
                 # The plugin didn't have an accompanying YAML file
                 defaults = None
 
-            source = self._plugin_base.make_plugin_source(searchpath=[os.path.dirname(location)])
+            source = self._plugin_base.make_plugin_source(searchpath=[os.path.dirname(location)], identifier='pip_plugin-' + self._identifier)
             self._alternate_sources[('pip', package_name)] = source
 
         else:
diff --git a/src/buildstream/_project.py b/src/buildstream/_project.py
index 5f433c0..74fca7f 100644
--- a/src/buildstream/_project.py
+++ b/src/buildstream/_project.py
@@ -97,6 +97,8 @@ class Project():
                  default_mirror=None, parent_loader=None,
                  search_for_project=True, fetch_subprojects=None):
 
+        self._pass = None
+
         # The project name
         self.name = None
 
@@ -617,6 +619,8 @@ class Project():
         config_no_include = _yaml.node_copy(self._default_config_node)
         _yaml.composite(config_no_include, project_conf_first_pass)
 
+        assert self._pass is None
+        self._pass = 1
         self._load_pass(config_no_include, self.first_pass_config,
                         ignore_unknown=True)
 
@@ -641,6 +645,8 @@ class Project():
         config = _yaml.node_copy(self._default_config_node)
         _yaml.composite(config, project_conf_second_pass)
 
+        assert self._pass == 1
+        self._pass = 2
         self._load_pass(config, self.config)
 
         self._validate_node(config)
@@ -914,10 +920,12 @@ class Project():
         pluginbase = PluginBase(package='buildstream.plugins')
         output.element_factory = ElementFactory(pluginbase,
                                                 plugin_origins=plugin_element_origins,
-                                                format_versions=element_format_versions)
+                                                format_versions=element_format_versions,
+                                                pass_=self._pass)
         output.source_factory = SourceFactory(pluginbase,
                                               plugin_origins=plugin_source_origins,
-                                              format_versions=source_format_versions)
+                                              format_versions=source_format_versions,
+                                              pass_=self._pass)
 
     # _store_origin()
     #
diff --git a/src/buildstream/_scheduler/jobs/jobpickler.py b/src/buildstream/_scheduler/jobs/jobpickler.py
new file mode 100644
index 0000000..5c1742f
--- /dev/null
+++ b/src/buildstream/_scheduler/jobs/jobpickler.py
@@ -0,0 +1,151 @@
+#
+#  Copyright (C) 2019 Bloomberg Finance LP
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+#  Authors:
+#        Angelos Evripiotis <je...@bloomberg.net>
+
+
+import copyreg
+import io
+import pickle
+
+from ..._protos.buildstream.v2.artifact_pb2 import Artifact as ArtifactProto
+
+# BuildStream toplevel imports
+from ... import Element, Source
+from ..._loader import Loader
+from ..._messenger import Messenger
+
+
+def _reduce_artifact_proto(instance):
+    assert isinstance(instance, ArtifactProto)
+    data = instance.SerializeToString()
+    return (_unreduce_artifact_proto, (data,))
+
+
+def _unreduce_artifact_proto(data):
+    instance = ArtifactProto()
+    instance.ParseFromString(data)
+    return instance
+
+
+def _reduce_loader(instance):
+    assert isinstance(instance, Loader)
+    state = instance.__dict__.copy()
+
+    # When pickling a Loader over to the ChildJob, we don't want to bring
+    # the whole Stream over with it. The _fetch_subprojects member is a method
+    # of the Stream. We also don't want to remove it in the main process. If we
+    # remove it in the child process then we will already be too late. The only
+    # time that seems just right is here, when preparing the child process'
+    # copy of the Loader.
+    #
+    del state['_fetch_subprojects']
+
+    return (Loader.__new__, (Loader,), state)
+
+
+def _reduce_messenger(instance):
+    assert isinstance(instance, Messenger)
+    state = instance.__dict__.copy()
+
+    # When pickling a Messenger over to the ChildJob, we don't want to bring
+    # the whole _message_handler over with it. We also don't want to remove it
+    # in the main process. If we remove it in the child process then we will
+    # already be too late. The only time that seems just right is here, when
+    # preparing the child process' copy of the Messenger.
+    #
+    # Another approach might be to use a context manager on the Messenger,
+    # which removes and restores the _message_handler. This wouldn't require
+    # access to private details of Messenger.
+    #
+    del state['_message_handler']
+
+    return (Messenger.__new__, (Messenger,), state)
+
+
+def _reduce_element(element):
+    assert isinstance(element, Element)
+    meta_kind = element._meta_kind
+    project = element._get_project()
+    factory = project.config.element_factory
+    args = (factory, meta_kind)
+    state = element.__dict__.copy()
+    state["_Element__reverse_dependencies"] = None
+    state["_Element__buildable_callback"] = None
+    return (_unreduce_plugin, args, state)
+
+
+def _reduce_source(source):
+    assert isinstance(source, Source)
+    meta_kind = source._meta_kind
+    project = source._get_project()
+    factory = project.config.source_factory
+    args = (factory, meta_kind)
+    return (_unreduce_plugin, args, source.__dict__.copy())
+
+
+def _unreduce_plugin(factory, meta_kind):
+    cls, _ = factory.lookup(meta_kind)
+    plugin = cls.__new__(cls)
+
+    # TODO: find a better way of persisting this factory, otherwise the plugin
+    # will become invalid.
+    plugin.factory = factory
+
+    return plugin
+
+
+def pickle_child_job(child_job, context):
+
+    # Note: Another way of doing this would be to let PluginBase do it's
+    # import-magic. We would achieve this by first pickling the factories, and
+    # the string names of their plugins. Unpickling the plugins in the child
+    # process would then "just work". There would be an additional cost of
+    # having to load every plugin kind, regardless of which ones are used.
+
+    projects = context.get_projects()
+    element_classes = [
+        cls
+        for p in projects
+        for cls, _ in p.config.element_factory._types.values()
+    ]
+    source_classes = [
+        cls
+        for p in projects
+        for cls, _ in p.config.source_factory._types.values()
+    ]
+
+    data = io.BytesIO()
+    pickler = pickle.Pickler(data)
+    pickler.dispatch_table = copyreg.dispatch_table.copy()
+    for cls in element_classes:
+        pickler.dispatch_table[cls] = _reduce_element
+    for cls in source_classes:
+        pickler.dispatch_table[cls] = _reduce_source
+    pickler.dispatch_table[ArtifactProto] = _reduce_artifact_proto
+    pickler.dispatch_table[Loader] = _reduce_loader
+    pickler.dispatch_table[Messenger] = _reduce_messenger
+
+    pickler.dump(child_job)
+    data.seek(0)
+
+    return data
+
+
+def unpickle_child_job(pickled):
+    child_job = pickle.load(pickled)
+    return child_job
diff --git a/src/buildstream/_sourcefactory.py b/src/buildstream/_sourcefactory.py
index 1d959a1..eca4b50 100644
--- a/src/buildstream/_sourcefactory.py
+++ b/src/buildstream/_sourcefactory.py
@@ -33,9 +33,10 @@ class SourceFactory(PluginContext):
 
     def __init__(self, plugin_base, *,
                  format_versions={},
-                 plugin_origins=None):
+                 plugin_origins=None,
+                 pass_=None):
 
-        super().__init__(plugin_base, Source, [_site.source_plugins],
+        super().__init__(plugin_base, Source, [_site.source_plugins], 'source' + str(pass_),
                          format_versions=format_versions,
                          plugin_origins=plugin_origins)
 
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 758a0b9..7061a27 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -186,6 +186,7 @@ class Element(Plugin):
     """
 
     def __init__(self, context, project, meta, plugin_conf):
+        self._meta_kind = meta.kind
 
         self.__cache_key_dict = None            # Dict for cache key calculation
         self.__cache_key = None                 # Our cached cache key
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index f8b5d3f..202af83 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -312,6 +312,8 @@ class Source(Plugin):
         super().__init__("{}-{}".format(meta.element_name, meta.element_index),
                          context, project, provenance, "source", unique_id=unique_id)
 
+        self._meta_kind = meta.kind
+
         self.__element_name = meta.element_name         # The name of the element owning this source
         self.__element_index = meta.element_index       # The index of the source in the owning element's source list
         self.__element_kind = meta.element_kind         # The kind of the element owning this source