You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:22:19 UTC
[buildstream] 29/33: WIP: pickle: use custom reduction for plugins
This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch aevri/picklable_jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit f055036d4a43d409a4dc39515dd8a07206b7c1f8
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Wed Apr 10 10:50:26 2019 +0100
WIP: pickle: use custom reduction for plugins
This removes the need for a PicklablePluginProxy, and perhaps makes this
easier to follow, as it is more direct.
---
src/buildstream/_elementfactory.py | 5 +--
src/buildstream/_project.py | 6 +++
src/buildstream/_scheduler/jobs/job.py | 73 ++++++++++++++++++++++++++++++----
src/buildstream/_sourcefactory.py | 3 +-
src/buildstream/plugin.py | 3 --
5 files changed, 75 insertions(+), 15 deletions(-)
diff --git a/src/buildstream/_elementfactory.py b/src/buildstream/_elementfactory.py
index 89ec03b..1a99e33 100644
--- a/src/buildstream/_elementfactory.py
+++ b/src/buildstream/_elementfactory.py
@@ -63,6 +63,5 @@ class ElementFactory(PluginContext):
element = element_type(context, project, meta, default_config)
version = self._format_versions.get(meta.kind, 0)
self._assert_plugin_format(element, version)
- proxy = PicklablePluginProxy(element, self, meta.kind)
- element._setup_artifact(proxy, context)
- return proxy
+ element._setup_artifact(element, context)
+ return element
diff --git a/src/buildstream/_project.py b/src/buildstream/_project.py
index 526bb8d..2a8f632 100644
--- a/src/buildstream/_project.py
+++ b/src/buildstream/_project.py
@@ -87,6 +87,12 @@ class ProjectConfig:
self.default_mirror = None # The name of the preferred mirror.
self._aliases = {} # Aliases dictionary
+ def __getstate__(self):
+ state = self.__dict__.copy()
+ del state["element_factory"]
+ del state["source_factory"]
+ return state
+
# Project()
#
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index d265f45..76cbe0d 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -20,6 +20,7 @@
# Tristan Maat <tr...@codethink.co.uk>
# System imports
+import copyreg
import io
import os
import pickle
@@ -33,7 +34,7 @@ import multiprocessing
# BuildStream toplevel imports
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
-from ... import _signals, utils
+from ... import _signals, utils, Plugin, Element, Source
# Return code values shutdown of job handling child processes
#
@@ -86,19 +87,77 @@ class Process(multiprocessing.Process):
self._sentinel = self._popen.sentinel
-def _pickle_child_job(child_job):
+def _reduce_element(element):
+ assert isinstance(element, Element)
+ meta_kind = element._meta_kind
+ project = element._get_project()
+ factory = project.config.element_factory
+ args = (factory, meta_kind)
+ state = element.__dict__.copy()
+ del state["_Element__reverse_dependencies"]
+ return (_unreduce_plugin, args, state)
+
+
+def _reduce_source(source):
+ assert isinstance(source, Source)
+ meta_kind = source._meta_kind
+ project = source._get_project()
+ factory = project.config.source_factory
+ args = (factory, meta_kind)
+ return (_unreduce_plugin, args, source.__dict__.copy())
+
+
+def _unreduce_plugin(factory, meta_kind):
+ cls, _ = factory.lookup(meta_kind)
+ plugin = cls.__new__(cls)
+
+ # TODO: find a better way of persisting this factory, otherwise the plugin
+ # will become invalid.
+ plugin.factory = factory
+
+ return plugin
+
+
+def _pickle_child_job(child_job, context):
+
+ # Note: Another way of doing this would be to let PluginBase do it's
+ # import-magic. We would achieve this by first pickling the factories, and
+ # the string names of their plugins. Unpickling the plugins in the child
+ # process would then "just work". There would be an additional cost of
+ # having to load every plugin kind, regardless of which ones are used.
+
+ projects = context.get_projects()
+ element_classes = [
+ cls
+ for p in projects
+ for cls, _ in p.config.element_factory._types.values()
+ ]
+ source_classes = [
+ cls
+ for p in projects
+ for cls, _ in p.config.source_factory._types.values()
+ ]
+
data = io.BytesIO()
- pickle.dump(child_job, data)
+ pickler = pickle.Pickler(data)
+ pickler.dispatch_table = copyreg.dispatch_table.copy()
+ for cls in element_classes:
+ pickler.dispatch_table[cls] = _reduce_element
+ for cls in source_classes:
+ pickler.dispatch_table[cls] = _reduce_source
+ pickler.dump(child_job)
data.seek(0)
+
return data
def _unpickle_child_job(pickled):
- return pickle.load(pickled)
+ child_job = pickle.load(pickled)
+ return child_job
def _do_pickled_child_job(pickled, *child_args):
- child_job = pickle.load(pickled)
+ child_job = _unpickle_child_job(pickled)
# Spawn the process
#
@@ -184,11 +243,11 @@ class Job():
print(f"({now - then:,.2}s):", message)
# import buildstream.testpickle
- # pickled_process = buildstream.testpickle.test_pickle_direct(child_job)
+ # buildstream.testpickle.test_pickle(child_job)
# Spawn the process
with timer(f"Pickle {child_job}"):
- pickled = _pickle_child_job(child_job)
+ pickled = _pickle_child_job(child_job, self._scheduler.context)
print(f"Size of pickled data: {len(pickled.getbuffer()):,}")
self._process = Process(
target=_do_pickled_child_job,
diff --git a/src/buildstream/_sourcefactory.py b/src/buildstream/_sourcefactory.py
index 46dc24f..a9f3ff6 100644
--- a/src/buildstream/_sourcefactory.py
+++ b/src/buildstream/_sourcefactory.py
@@ -62,5 +62,4 @@ class SourceFactory(PluginContext):
source = source_type(context, project, meta)
version = self._format_versions.get(meta.kind, 0)
self._assert_plugin_format(source, version)
- proxy = PicklablePluginProxy(source, self, meta.kind)
- return proxy
+ return source
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index 539d61f..56f4666 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -255,9 +255,6 @@ class Plugin():
def __getstate__(self):
raise NotImplementedError("Don't pickle this.")
- def __setstate__(self, state):
- raise NotImplementedError("Don't pickle this.")
-
def __del__(self):
# Dont send anything through the Message() pipeline at destruction time,
# any subsequent lookup of plugin by unique id would raise KeyError.