You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:30:10 UTC
[buildstream] 02/21: WIP: pickle things
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch aevri/win32
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 05893e1dad07927302974112ec904805029b50cb
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Jun 18 15:42:59 2019 +0100
WIP: pickle things
---
src/buildstream/_artifactcache.py | 6 +++
src/buildstream/_context.py | 5 +++
src/buildstream/_elementfactory.py | 7 ++-
src/buildstream/_options/optionpool.py | 12 ++++++
src/buildstream/_plugincontext.py | 26 +++++++++--
src/buildstream/_project.py | 18 +++++++-
src/buildstream/_scheduler/jobs/job.py | 79 +++++++++++++++++++++++++++++++++-
src/buildstream/_sourcecache.py | 6 +++
src/buildstream/_sourcefactory.py | 5 ++-
src/buildstream/element.py | 1 +
src/buildstream/source.py | 2 +
11 files changed, 156 insertions(+), 11 deletions(-)
diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py
index de17ea7..a4497ba 100644
--- a/src/buildstream/_artifactcache.py
+++ b/src/buildstream/_artifactcache.py
@@ -103,6 +103,12 @@ class ArtifactCache(BaseCache):
self.cas.add_reachable_directories_callback(self._reachable_directories)
self.cas.add_reachable_digests_callback(self._reachable_digests)
+ def __getstate__(self):
+ state = self.__dict__.copy()
+ # TODO: actually pickle the elements, resolving to the same objects.
+ state['_required_elements'] = set()
+ return state
+
# mark_required_elements():
#
# Mark elements whose artifacts are required for the current run.
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index 3f6e6ac..d9ef1f1 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -168,6 +168,11 @@ class Context():
self._casquota = None
self._directory = directory
+ def __getstate__(self):
+ state = self.__dict__.copy()
+ del state['_message_handler']
+ return state
+
# load()
#
# Loads the configuration files
diff --git a/src/buildstream/_elementfactory.py b/src/buildstream/_elementfactory.py
index d6591bf..b2a7f73 100644
--- a/src/buildstream/_elementfactory.py
+++ b/src/buildstream/_elementfactory.py
@@ -33,9 +33,12 @@ class ElementFactory(PluginContext):
def __init__(self, plugin_base, *,
format_versions={},
- plugin_origins=None):
+ plugin_origins=None,
+ pass_=None):
- super().__init__(plugin_base, Element, [_site.element_plugins],
+ assert pass_ is not None
+
+ super().__init__(plugin_base, Element, [_site.element_plugins], 'element' + str(pass_),
plugin_origins=plugin_origins,
format_versions=format_versions)
diff --git a/src/buildstream/_options/optionpool.py b/src/buildstream/_options/optionpool.py
index de3af3e..d5dea39 100644
--- a/src/buildstream/_options/optionpool.py
+++ b/src/buildstream/_options/optionpool.py
@@ -56,6 +56,18 @@ class OptionPool():
self._environment = jinja2.Environment(undefined=jinja2.StrictUndefined)
self._environment.globals = []
+ def __getstate__(self):
+ state = self.__dict__.copy()
+ del state['_environment']
+ return state
+
+ def __setstate__(self, state):
+ self.__dict__.update(state)
+
+ # jinja2 environment, with default globals cleared out of the way
+ self._environment = jinja2.Environment(undefined=jinja2.StrictUndefined)
+ self._environment.globals = []
+
# load()
#
# Loads the options described in the project.conf
diff --git a/src/buildstream/_plugincontext.py b/src/buildstream/_plugincontext.py
index 7a5407c..0d322c1 100644
--- a/src/buildstream/_plugincontext.py
+++ b/src/buildstream/_plugincontext.py
@@ -42,10 +42,12 @@ from . import _yaml
#
class PluginContext():
- def __init__(self, plugin_base, base_type, site_plugin_path, *,
+ def __init__(self, plugin_base, base_type, site_plugin_path, identifier, *,
plugin_origins=None, dependencies=None,
format_versions={}):
+ self._identifier = identifier
+
# The plugin kinds which were loaded
self.loaded_dependencies = []
@@ -59,10 +61,26 @@ class PluginContext():
# The PluginSource object
self._plugin_base = plugin_base
- self._site_source = plugin_base.make_plugin_source(searchpath=site_plugin_path)
+ self._site_plugin_path = site_plugin_path
+ self._site_source = plugin_base.make_plugin_source(
+ searchpath=self._site_plugin_path,
+ identifier='site_plugin-' + self._identifier)
self._alternate_sources = {}
self._format_versions = format_versions
+ def __getstate__(self):
+ import copy
+ state = copy.copy(self.__dict__)
+ del state['_site_source']
+ state['_types'] = {}
+ return state
+
+ def __setstate__(self, state):
+ self.__dict__ = state
+ self._site_source = self._plugin_base.make_plugin_source(
+ searchpath=self._site_plugin_path,
+ identifier='site_plugin-' + self._identifier)
+
# lookup():
#
# Fetches a type loaded from a plugin in this plugin context
@@ -80,7 +98,7 @@ class PluginContext():
def _get_local_plugin_source(self, path):
if ('local', path) not in self._alternate_sources:
# key by a tuple to avoid collision
- source = self._plugin_base.make_plugin_source(searchpath=[path])
+ source = self._plugin_base.make_plugin_source(searchpath=[path], identifier='local_plugin-' + path + '-' + self._identifier)
# Ensure that sources never get garbage collected,
# as they'll take the plugins with them.
self._alternate_sources[('local', path)] = source
@@ -121,7 +139,7 @@ class PluginContext():
# The plugin didn't have an accompanying YAML file
defaults = None
- source = self._plugin_base.make_plugin_source(searchpath=[os.path.dirname(location)])
+ source = self._plugin_base.make_plugin_source(searchpath=[os.path.dirname(location)], identifier='pip_plugin-' + self._identifier)
self._alternate_sources[('pip', package_name)] = source
else:
diff --git a/src/buildstream/_project.py b/src/buildstream/_project.py
index 1fdc84a..a2df5bce 100644
--- a/src/buildstream/_project.py
+++ b/src/buildstream/_project.py
@@ -86,6 +86,12 @@ class ProjectConfig:
self.default_mirror = None # The name of the preferred mirror.
self._aliases = {} # Aliases dictionary
+ def __getstate__(self):
+ state = self.__dict__.copy()
+ del state["element_factory"]
+ del state["source_factory"]
+ return state
+
# Project()
#
@@ -97,6 +103,8 @@ class Project():
default_mirror=None, parent_loader=None,
search_for_project=True):
+ self._pass = None
+
# The project name
self.name = None
@@ -622,6 +630,8 @@ class Project():
config_no_include = _yaml.node_copy(self._default_config_node)
_yaml.composite(config_no_include, project_conf_first_pass)
+ assert self._pass is None
+ self._pass = 1
self._load_pass(config_no_include, self.first_pass_config,
ignore_unknown=True)
@@ -646,6 +656,8 @@ class Project():
config = _yaml.node_copy(self._default_config_node)
_yaml.composite(config, project_conf_second_pass)
+ assert self._pass == 1
+ self._pass = 2
self._load_pass(config, self.config)
self._validate_node(config)
@@ -919,10 +931,12 @@ class Project():
pluginbase = PluginBase(package='buildstream.plugins')
output.element_factory = ElementFactory(pluginbase,
plugin_origins=plugin_element_origins,
- format_versions=element_format_versions)
+ format_versions=element_format_versions,
+ pass_=self._pass)
output.source_factory = SourceFactory(pluginbase,
plugin_origins=plugin_source_origins,
- format_versions=source_format_versions)
+ format_versions=source_format_versions,
+ pass_=self._pass)
# _store_origin()
#
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index e11a9f9..4ca2ee4 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -21,7 +21,10 @@
# System imports
import enum
+import copyreg
+import io
import os
+import pickle
import sys
import signal
import datetime
@@ -32,7 +35,7 @@ import multiprocessing
# BuildStream toplevel imports
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
-from ... import _signals, utils
+from ... import _signals, utils, Plugin, Element, Source
# Return code values shutdown of job handling child processes
@@ -87,6 +90,80 @@ class _MessageType(enum.Enum):
SUBCLASS_CUSTOM_MESSAGE = 5
+def _reduce_element(element):
+ assert isinstance(element, Element)
+ meta_kind = element._meta_kind
+ project = element._get_project()
+ factory = project.config.element_factory
+ args = (factory, meta_kind)
+ state = element.__dict__.copy()
+ del state["_Element__reverse_dependencies"]
+ return (_unreduce_plugin, args, state)
+
+
+def _reduce_source(source):
+ assert isinstance(source, Source)
+ meta_kind = source._meta_kind
+ project = source._get_project()
+ factory = project.config.source_factory
+ args = (factory, meta_kind)
+ return (_unreduce_plugin, args, source.__dict__.copy())
+
+
+def _unreduce_plugin(factory, meta_kind):
+ cls, _ = factory.lookup(meta_kind)
+ plugin = cls.__new__(cls)
+
+ # TODO: find a better way of persisting this factory, otherwise the plugin
+ # will become invalid.
+ plugin.factory = factory
+
+ return plugin
+
+
+def _pickle_child_job(child_job, context):
+
+ # Note: Another way of doing this would be to let PluginBase do it's
+ # import-magic. We would achieve this by first pickling the factories, and
+ # the string names of their plugins. Unpickling the plugins in the child
+ # process would then "just work". There would be an additional cost of
+ # having to load every plugin kind, regardless of which ones are used.
+
+ projects = context.get_projects()
+ element_classes = [
+ cls
+ for p in projects
+ for cls, _ in p.config.element_factory._types.values()
+ ]
+ source_classes = [
+ cls
+ for p in projects
+ for cls, _ in p.config.source_factory._types.values()
+ ]
+
+ data = io.BytesIO()
+ pickler = pickle.Pickler(data)
+ pickler.dispatch_table = copyreg.dispatch_table.copy()
+ for cls in element_classes:
+ pickler.dispatch_table[cls] = _reduce_element
+ for cls in source_classes:
+ pickler.dispatch_table[cls] = _reduce_source
+ pickler.dump(child_job)
+ data.seek(0)
+
+ return data
+
+
+def _unpickle_child_job(pickled):
+ child_job = pickle.load(pickled)
+ return child_job
+
+
+def _do_pickled_child_job(pickled, *child_args):
+ child_job = _unpickle_child_job(pickled)
+ return child_job.child_action(*child_args)
+
+
# Job()
#
# The Job object represents a task that will run in parallel to the main
diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py
index ce0694e..96d874b 100644
--- a/src/buildstream/_sourcecache.py
+++ b/src/buildstream/_sourcecache.py
@@ -58,6 +58,12 @@ class SourceCache(BaseCache):
self.casquota.add_remove_callbacks(self.unrequired_sources, self.cas.remove)
self.casquota.add_list_refs_callback(self.list_sources)
+ def __getstate__(self):
+ state = self.__dict__.copy()
+ # TODO: actually pickle the sources, resolving to the same objects.
+ state['_required_sources'] = set()
+ return state
+
# mark_required_sources()
#
# Mark sources that are required by the current run.
diff --git a/src/buildstream/_sourcefactory.py b/src/buildstream/_sourcefactory.py
index 1d959a1..eca4b50 100644
--- a/src/buildstream/_sourcefactory.py
+++ b/src/buildstream/_sourcefactory.py
@@ -33,9 +33,10 @@ class SourceFactory(PluginContext):
def __init__(self, plugin_base, *,
format_versions={},
- plugin_origins=None):
+ plugin_origins=None,
+ pass_=None):
- super().__init__(plugin_base, Source, [_site.source_plugins],
+ super().__init__(plugin_base, Source, [_site.source_plugins], 'source' + str(pass_),
format_versions=format_versions,
plugin_origins=plugin_origins)
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index a605460..359e183 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -186,6 +186,7 @@ class Element(Plugin):
"""
def __init__(self, context, project, meta, plugin_conf):
+ self._meta_kind = meta.kind
self.__cache_key_dict = None # Dict for cache key calculation
self.__cache_key = None # Our cached cache key
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index 9fc9cf1..90db130 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -312,6 +312,8 @@ class Source(Plugin):
super().__init__("{}-{}".format(meta.element_name, meta.element_index),
context, project, provenance, "source", unique_id=unique_id)
+ self._meta_kind = meta.kind
+
self.__source_cache = context.sourcecache
self.__element_name = meta.element_name # The name of the element owning this source