You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:32:52 UTC
[buildstream] 02/22: WIP: pickleable jobs
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch aevri/win32_minimal_seemstowork_20190829
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 43021072636526991f89edf336e21fd658c47452
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Wed Jul 10 09:24:30 2019 +0100
WIP: pickleable jobs
WIP: make some things pickleable
WIP: Make ElementFactory and SourceFactory picklable
WIP: job: pickling machinery for jobs
WIP: loader: don't pickle _fetch_subprojects
Note that a new circular dependency was introduced in:
737aea18 Handle subproject fetching in the Stream class
We can avoid the complications by not pickling it out to the child job.
WIP: move reducers to jobpickler
WIP: jobpickler: use proto serialization instead of pickle
WIP: jobpickler: pickle less
WIP: job: pickle apologies
---
src/buildstream/_elementfactory.py | 7 +-
src/buildstream/_plugincontext.py | 26 ++++-
src/buildstream/_project.py | 12 +-
src/buildstream/_scheduler/jobs/jobpickler.py | 151 ++++++++++++++++++++++++++
src/buildstream/_sourcefactory.py | 5 +-
src/buildstream/element.py | 1 +
src/buildstream/source.py | 2 +
7 files changed, 194 insertions(+), 10 deletions(-)
diff --git a/src/buildstream/_elementfactory.py b/src/buildstream/_elementfactory.py
index d6591bf..b2a7f73 100644
--- a/src/buildstream/_elementfactory.py
+++ b/src/buildstream/_elementfactory.py
@@ -33,9 +33,12 @@ class ElementFactory(PluginContext):
def __init__(self, plugin_base, *,
format_versions={},
- plugin_origins=None):
+ plugin_origins=None,
+ pass_=None):
- super().__init__(plugin_base, Element, [_site.element_plugins],
+ assert pass_ is not None
+
+ super().__init__(plugin_base, Element, [_site.element_plugins], 'element' + str(pass_),
plugin_origins=plugin_origins,
format_versions=format_versions)
diff --git a/src/buildstream/_plugincontext.py b/src/buildstream/_plugincontext.py
index 7fef9b9..fa07e7b 100644
--- a/src/buildstream/_plugincontext.py
+++ b/src/buildstream/_plugincontext.py
@@ -44,9 +44,11 @@ from . import _yaml
#
class PluginContext():
- def __init__(self, plugin_base, base_type, site_plugin_path, *,
+ def __init__(self, plugin_base, base_type, site_plugin_path, identifier, *,
plugin_origins=None, format_versions={}):
+ self._identifier = identifier
+
# The plugin kinds which were loaded
self.loaded_dependencies = []
@@ -59,10 +61,26 @@ class PluginContext():
# The PluginSource object
self._plugin_base = plugin_base
- self._site_source = plugin_base.make_plugin_source(searchpath=site_plugin_path)
+ self._site_plugin_path = site_plugin_path
+ self._site_source = plugin_base.make_plugin_source(
+ searchpath=self._site_plugin_path,
+ identifier='site_plugin-' + self._identifier)
self._alternate_sources = {}
self._format_versions = format_versions
+ def __getstate__(self):
+ import copy
+ state = copy.copy(self.__dict__)
+ del state['_site_source']
+ state['_types'] = {}
+ return state
+
+ def __setstate__(self, state):
+ self.__dict__ = state
+ self._site_source = self._plugin_base.make_plugin_source(
+ searchpath=self._site_plugin_path,
+ identifier='site_plugin-' + self._identifier)
+
# lookup():
#
# Fetches a type loaded from a plugin in this plugin context
@@ -80,7 +98,7 @@ class PluginContext():
def _get_local_plugin_source(self, path):
if ('local', path) not in self._alternate_sources:
# key by a tuple to avoid collision
- source = self._plugin_base.make_plugin_source(searchpath=[path])
+ source = self._plugin_base.make_plugin_source(searchpath=[path], identifier='local_plugin-' + path + '-' + self._identifier)
# Ensure that sources never get garbage collected,
# as they'll take the plugins with them.
self._alternate_sources[('local', path)] = source
@@ -121,7 +139,7 @@ class PluginContext():
# The plugin didn't have an accompanying YAML file
defaults = None
- source = self._plugin_base.make_plugin_source(searchpath=[os.path.dirname(location)])
+ source = self._plugin_base.make_plugin_source(searchpath=[os.path.dirname(location)], identifier='pip_plugin-' + self._identifier)
self._alternate_sources[('pip', package_name)] = source
else:
diff --git a/src/buildstream/_project.py b/src/buildstream/_project.py
index 5f433c0..74fca7f 100644
--- a/src/buildstream/_project.py
+++ b/src/buildstream/_project.py
@@ -97,6 +97,8 @@ class Project():
default_mirror=None, parent_loader=None,
search_for_project=True, fetch_subprojects=None):
+ self._pass = None
+
# The project name
self.name = None
@@ -617,6 +619,8 @@ class Project():
config_no_include = _yaml.node_copy(self._default_config_node)
_yaml.composite(config_no_include, project_conf_first_pass)
+ assert self._pass is None
+ self._pass = 1
self._load_pass(config_no_include, self.first_pass_config,
ignore_unknown=True)
@@ -641,6 +645,8 @@ class Project():
config = _yaml.node_copy(self._default_config_node)
_yaml.composite(config, project_conf_second_pass)
+ assert self._pass == 1
+ self._pass = 2
self._load_pass(config, self.config)
self._validate_node(config)
@@ -914,10 +920,12 @@ class Project():
pluginbase = PluginBase(package='buildstream.plugins')
output.element_factory = ElementFactory(pluginbase,
plugin_origins=plugin_element_origins,
- format_versions=element_format_versions)
+ format_versions=element_format_versions,
+ pass_=self._pass)
output.source_factory = SourceFactory(pluginbase,
plugin_origins=plugin_source_origins,
- format_versions=source_format_versions)
+ format_versions=source_format_versions,
+ pass_=self._pass)
# _store_origin()
#
diff --git a/src/buildstream/_scheduler/jobs/jobpickler.py b/src/buildstream/_scheduler/jobs/jobpickler.py
new file mode 100644
index 0000000..5c1742f
--- /dev/null
+++ b/src/buildstream/_scheduler/jobs/jobpickler.py
@@ -0,0 +1,151 @@
+#
+# Copyright (C) 2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Angelos Evripiotis <je...@bloomberg.net>
+
+
+import copyreg
+import io
+import pickle
+
+from ..._protos.buildstream.v2.artifact_pb2 import Artifact as ArtifactProto
+
+# BuildStream toplevel imports
+from ... import Element, Source
+from ..._loader import Loader
+from ..._messenger import Messenger
+
+
+def _reduce_artifact_proto(instance):
+ assert isinstance(instance, ArtifactProto)
+ data = instance.SerializeToString()
+ return (_unreduce_artifact_proto, (data,))
+
+
+def _unreduce_artifact_proto(data):
+ instance = ArtifactProto()
+ instance.ParseFromString(data)
+ return instance
+
+
+def _reduce_loader(instance):
+ assert isinstance(instance, Loader)
+ state = instance.__dict__.copy()
+
+ # When pickling a Loader over to the ChildJob, we don't want to bring
+ # the whole Stream over with it. The _fetch_subprojects member is a method
+ # of the Stream. We also don't want to remove it in the main process. If we
+ # remove it in the child process then we will already be too late. The only
+ # time that seems just right is here, when preparing the child process'
+ # copy of the Loader.
+ #
+ del state['_fetch_subprojects']
+
+ return (Loader.__new__, (Loader,), state)
+
+
+def _reduce_messenger(instance):
+ assert isinstance(instance, Messenger)
+ state = instance.__dict__.copy()
+
+ # When pickling a Messenger over to the ChildJob, we don't want to bring
+ # the whole _message_handler over with it. We also don't want to remove it
+ # in the main process. If we remove it in the child process then we will
+ # already be too late. The only time that seems just right is here, when
+ # preparing the child process' copy of the Messenger.
+ #
+ # Another approach might be to use a context manager on the Messenger,
+ # which removes and restores the _message_handler. This wouldn't require
+ # access to private details of Messenger.
+ #
+ del state['_message_handler']
+
+ return (Messenger.__new__, (Messenger,), state)
+
+
+def _reduce_element(element):
+ assert isinstance(element, Element)
+ meta_kind = element._meta_kind
+ project = element._get_project()
+ factory = project.config.element_factory
+ args = (factory, meta_kind)
+ state = element.__dict__.copy()
+ state["_Element__reverse_dependencies"] = None
+ state["_Element__buildable_callback"] = None
+ return (_unreduce_plugin, args, state)
+
+
+def _reduce_source(source):
+ assert isinstance(source, Source)
+ meta_kind = source._meta_kind
+ project = source._get_project()
+ factory = project.config.source_factory
+ args = (factory, meta_kind)
+ return (_unreduce_plugin, args, source.__dict__.copy())
+
+
+def _unreduce_plugin(factory, meta_kind):
+ cls, _ = factory.lookup(meta_kind)
+ plugin = cls.__new__(cls)
+
+ # TODO: find a better way of persisting this factory, otherwise the plugin
+ # will become invalid.
+ plugin.factory = factory
+
+ return plugin
+
+
+def pickle_child_job(child_job, context):
+
+ # Note: Another way of doing this would be to let PluginBase do it's
+ # import-magic. We would achieve this by first pickling the factories, and
+ # the string names of their plugins. Unpickling the plugins in the child
+ # process would then "just work". There would be an additional cost of
+ # having to load every plugin kind, regardless of which ones are used.
+
+ projects = context.get_projects()
+ element_classes = [
+ cls
+ for p in projects
+ for cls, _ in p.config.element_factory._types.values()
+ ]
+ source_classes = [
+ cls
+ for p in projects
+ for cls, _ in p.config.source_factory._types.values()
+ ]
+
+ data = io.BytesIO()
+ pickler = pickle.Pickler(data)
+ pickler.dispatch_table = copyreg.dispatch_table.copy()
+ for cls in element_classes:
+ pickler.dispatch_table[cls] = _reduce_element
+ for cls in source_classes:
+ pickler.dispatch_table[cls] = _reduce_source
+ pickler.dispatch_table[ArtifactProto] = _reduce_artifact_proto
+ pickler.dispatch_table[Loader] = _reduce_loader
+ pickler.dispatch_table[Messenger] = _reduce_messenger
+
+ pickler.dump(child_job)
+ data.seek(0)
+
+ return data
+
+
+def unpickle_child_job(pickled):
+ child_job = pickle.load(pickled)
+ return child_job
diff --git a/src/buildstream/_sourcefactory.py b/src/buildstream/_sourcefactory.py
index 1d959a1..eca4b50 100644
--- a/src/buildstream/_sourcefactory.py
+++ b/src/buildstream/_sourcefactory.py
@@ -33,9 +33,10 @@ class SourceFactory(PluginContext):
def __init__(self, plugin_base, *,
format_versions={},
- plugin_origins=None):
+ plugin_origins=None,
+ pass_=None):
- super().__init__(plugin_base, Source, [_site.source_plugins],
+ super().__init__(plugin_base, Source, [_site.source_plugins], 'source' + str(pass_),
format_versions=format_versions,
plugin_origins=plugin_origins)
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 758a0b9..7061a27 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -186,6 +186,7 @@ class Element(Plugin):
"""
def __init__(self, context, project, meta, plugin_conf):
+ self._meta_kind = meta.kind
self.__cache_key_dict = None # Dict for cache key calculation
self.__cache_key = None # Our cached cache key
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index f8b5d3f..202af83 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -312,6 +312,8 @@ class Source(Plugin):
super().__init__("{}-{}".format(meta.element_name, meta.element_index),
context, project, provenance, "source", unique_id=unique_id)
+ self._meta_kind = meta.kind
+
self.__element_name = meta.element_name # The name of the element owning this source
self.__element_index = meta.element_index # The index of the source in the owning element's source list
self.__element_kind = meta.element_kind # The kind of the element owning this source