You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:22:19 UTC

[buildstream] 29/33: WIP: pickle: use custom reduction for plugins

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch aevri/picklable_jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit f055036d4a43d409a4dc39515dd8a07206b7c1f8
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Wed Apr 10 10:50:26 2019 +0100

    WIP: pickle: use custom reduction for plugins
    
    This removes the need for a PicklablePluginProxy, and perhaps makes this
    easier to follow, as it is more direct.
---
 src/buildstream/_elementfactory.py     |  5 +--
 src/buildstream/_project.py            |  6 +++
 src/buildstream/_scheduler/jobs/job.py | 73 ++++++++++++++++++++++++++++++----
 src/buildstream/_sourcefactory.py      |  3 +-
 src/buildstream/plugin.py              |  3 --
 5 files changed, 75 insertions(+), 15 deletions(-)

diff --git a/src/buildstream/_elementfactory.py b/src/buildstream/_elementfactory.py
index 89ec03b..1a99e33 100644
--- a/src/buildstream/_elementfactory.py
+++ b/src/buildstream/_elementfactory.py
@@ -63,6 +63,5 @@ class ElementFactory(PluginContext):
         element = element_type(context, project, meta, default_config)
         version = self._format_versions.get(meta.kind, 0)
         self._assert_plugin_format(element, version)
-        proxy = PicklablePluginProxy(element, self, meta.kind)
-        element._setup_artifact(proxy, context)
-        return proxy
+        element._setup_artifact(element, context)
+        return element
diff --git a/src/buildstream/_project.py b/src/buildstream/_project.py
index 526bb8d..2a8f632 100644
--- a/src/buildstream/_project.py
+++ b/src/buildstream/_project.py
@@ -87,6 +87,12 @@ class ProjectConfig:
         self.default_mirror = None               # The name of the preferred mirror.
         self._aliases = {}                       # Aliases dictionary
 
+    def __getstate__(self):
+        state = self.__dict__.copy()
+        del state["element_factory"]
+        del state["source_factory"]
+        return state
+
 
 # Project()
 #
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index d265f45..76cbe0d 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -20,6 +20,7 @@
 #        Tristan Maat <tr...@codethink.co.uk>
 
 # System imports
+import copyreg
 import io
 import os
 import pickle
@@ -33,7 +34,7 @@ import multiprocessing
 # BuildStream toplevel imports
 from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
 from ..._message import Message, MessageType, unconditional_messages
-from ... import _signals, utils
+from ... import _signals, utils, Plugin, Element, Source
 
 # Return code values shutdown of job handling child processes
 #
@@ -86,19 +87,77 @@ class Process(multiprocessing.Process):
         self._sentinel = self._popen.sentinel
 
 
-def _pickle_child_job(child_job):
+def _reduce_element(element):
+    assert isinstance(element, Element)
+    meta_kind = element._meta_kind
+    project = element._get_project()
+    factory = project.config.element_factory
+    args = (factory, meta_kind)
+    state = element.__dict__.copy()
+    del state["_Element__reverse_dependencies"]
+    return (_unreduce_plugin, args, state)
+
+
+def _reduce_source(source):
+    assert isinstance(source, Source)
+    meta_kind = source._meta_kind
+    project = source._get_project()
+    factory = project.config.source_factory
+    args = (factory, meta_kind)
+    return (_unreduce_plugin, args, source.__dict__.copy())
+
+
+def _unreduce_plugin(factory, meta_kind):
+    cls, _ = factory.lookup(meta_kind)
+    plugin = cls.__new__(cls)
+
+    # TODO: find a better way of persisting this factory, otherwise the plugin
+    # will become invalid.
+    plugin.factory = factory
+
+    return plugin
+
+
+def _pickle_child_job(child_job, context):
+
+    # Note: Another way of doing this would be to let PluginBase do it's
+    # import-magic. We would achieve this by first pickling the factories, and
+    # the string names of their plugins. Unpickling the plugins in the child
+    # process would then "just work". There would be an additional cost of
+    # having to load every plugin kind, regardless of which ones are used.
+
+    projects = context.get_projects()
+    element_classes = [
+        cls
+        for p in projects
+        for cls, _ in p.config.element_factory._types.values()
+    ]
+    source_classes = [
+        cls
+        for p in projects
+        for cls, _ in p.config.source_factory._types.values()
+    ]
+
     data = io.BytesIO()
-    pickle.dump(child_job, data)
+    pickler = pickle.Pickler(data)
+    pickler.dispatch_table = copyreg.dispatch_table.copy()
+    for cls in element_classes:
+        pickler.dispatch_table[cls] = _reduce_element
+    for cls in source_classes:
+        pickler.dispatch_table[cls] = _reduce_source
+    pickler.dump(child_job)
     data.seek(0)
+
     return data
 
 
 def _unpickle_child_job(pickled):
-    return pickle.load(pickled)
+    child_job = pickle.load(pickled)
+    return child_job
 
 
 def _do_pickled_child_job(pickled, *child_args):
-    child_job = pickle.load(pickled)
+    child_job = _unpickle_child_job(pickled)
 
     # Spawn the process
     #
@@ -184,11 +243,11 @@ class Job():
             print(f"({now - then:,.2}s):", message)
 
         # import buildstream.testpickle
-        # pickled_process = buildstream.testpickle.test_pickle_direct(child_job)
+        # buildstream.testpickle.test_pickle(child_job)
 
         # Spawn the process
         with timer(f"Pickle {child_job}"):
-            pickled = _pickle_child_job(child_job)
+            pickled = _pickle_child_job(child_job, self._scheduler.context)
         print(f"Size of pickled data: {len(pickled.getbuffer()):,}")
         self._process = Process(
             target=_do_pickled_child_job,
diff --git a/src/buildstream/_sourcefactory.py b/src/buildstream/_sourcefactory.py
index 46dc24f..a9f3ff6 100644
--- a/src/buildstream/_sourcefactory.py
+++ b/src/buildstream/_sourcefactory.py
@@ -62,5 +62,4 @@ class SourceFactory(PluginContext):
         source = source_type(context, project, meta)
         version = self._format_versions.get(meta.kind, 0)
         self._assert_plugin_format(source, version)
-        proxy = PicklablePluginProxy(source, self, meta.kind)
-        return proxy
+        return source
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index 539d61f..56f4666 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -255,9 +255,6 @@ class Plugin():
     def __getstate__(self):
         raise NotImplementedError("Don't pickle this.")
 
-    def __setstate__(self, state):
-        raise NotImplementedError("Don't pickle this.")
-
     def __del__(self):
         # Dont send anything through the Message() pipeline at destruction time,
         # any subsequent lookup of plugin by unique id would raise KeyError.