You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 07:19:08 UTC

[buildstream] 09/21: _scheduler: don't pass whole queue to child job

This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/win32
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit db07192c18067f9f84cc686e975a4909c166a9c9
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Apr 9 13:31:39 2019 +0100

    _scheduler: don't pass whole queue to child job
    
    Stop passing the scheduler's job queue's across to child jobs, via the
    'action_cb' parameter. Instead pass a module-level function, which will
    pickle nicely.
    
    This isn't much of a problem while we are in the 'fork' multiprocessing
    model. As we move towards supporting the 'spawn' model for win32, then
    we need to consider what we will be pickling and unpickling, to cross
    the process boundary.
---
 .../_scheduler/queues/artifactpushqueue.py         | 11 ++++++----
 src/buildstream/_scheduler/queues/buildqueue.py    | 10 ++++++---
 src/buildstream/_scheduler/queues/fetchqueue.py    | 15 +++++++++++--
 src/buildstream/_scheduler/queues/pullqueue.py     | 11 ++++++----
 src/buildstream/_scheduler/queues/queue.py         | 25 +++++++++++++---------
 .../_scheduler/queues/sourcepushqueue.py           | 11 ++++++----
 src/buildstream/_scheduler/queues/trackqueue.py    |  8 +++++--
 7 files changed, 62 insertions(+), 29 deletions(-)

diff --git a/src/buildstream/_scheduler/queues/artifactpushqueue.py b/src/buildstream/_scheduler/queues/artifactpushqueue.py
index b861d4f..0b6fa13 100644
--- a/src/buildstream/_scheduler/queues/artifactpushqueue.py
+++ b/src/buildstream/_scheduler/queues/artifactpushqueue.py
@@ -32,13 +32,16 @@ class ArtifactPushQueue(Queue):
     complete_name = "Pushed"
     resources = [ResourceType.UPLOAD]
 
-    def process(self, element):
-        # returns whether an artifact was uploaded or not
-        if not element._push():
-            raise SkipJob(self.action_name)
+    def get_process_func(self):
+        return _raise_skip_if_not_pushed
 
     def status(self, element):
         if element._skip_push():
             return QueueStatus.SKIP
 
         return QueueStatus.READY
+
+
+def _raise_skip_if_not_pushed(element):
+    if not element._push():
+        raise SkipJob(ArtifactPushQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index ff65158..d0796f9 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -57,7 +57,7 @@ class BuildQueue(Queue):
                           logfile=logfile)
             job = ElementJob(self._scheduler, self.action_name,
                              logfile, element=element, queue=self,
-                             action_cb=self.process,
+                             action_cb=self.get_process_func(),
                              complete_cb=self._job_done,
                              max_retries=self._max_retries)
             self._done_queue.append(element)
@@ -66,8 +66,8 @@ class BuildQueue(Queue):
 
         return super().enqueue(to_queue)
 
-    def process(self, element):
-        return element._assemble()
+    def get_process_func(self):
+        return _assemble_element
 
     def status(self, element):
         if element._cached_success():
@@ -116,3 +116,7 @@ class BuildQueue(Queue):
         # Set a "buildable" callback for an element not yet ready
         # to be processed in the build queue.
         element._set_buildable_callback(self._enqueue_element)
+
+
+def _assemble_element(element):
+    return element._assemble()
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index bbb3b3d..9b619e7 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -41,8 +41,11 @@ class FetchQueue(Queue):
         self._skip_cached = skip_cached
         self._fetch_original = fetch_original
 
-    def process(self, element):
-        element._fetch(fetch_original=self._fetch_original)
+    def get_process_func(self):
+        if self._fetch_original:
+            return _fetch_original
+        else:
+            return _fetch_no_original
 
     def status(self, element):
         # Optionally skip elements that are already in the artifact cache
@@ -78,3 +81,11 @@ class FetchQueue(Queue):
         # Set a "can_query_cache" callback for an element not yet ready
         # to be processed in the fetch queue.
         element._set_can_query_cache_callback(self._enqueue_element)
+
+
+def _fetch_no_original(element):
+    element._fetch(fetch_original=False)
+
+
+def _fetch_original(element):
+    element._fetch(fetch_original=True)
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index 2452933..dfb7cbc 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -33,10 +33,8 @@ class PullQueue(Queue):
     complete_name = "Pulled"
     resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
 
-    def process(self, element):
-        # returns whether an artifact was downloaded or not
-        if not element._pull():
-            raise SkipJob(self.action_name)
+    def get_process_func(self):
+        return _raise_skip_if_not_pulled
 
     def status(self, element):
         if not element._can_query_cache():
@@ -65,3 +63,8 @@ class PullQueue(Queue):
         # immediately ready to query the artifact cache so that it
         # may be pulled.
         element._set_can_query_cache_callback(self._enqueue_element)
+
+
+def _raise_skip_if_not_pulled(element):
+    if not element._pull():
+        raise SkipJob(PullQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 9a07f63..be76e9e 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -91,20 +91,25 @@ class Queue():
     #     Abstract Methods for Queue implementations    #
     #####################################################
 
-    # process()
+    # get_process_func()
     #
-    # Abstract method for processing an element
+    # Abstract method, returns a callable for processing an element.
     #
-    # Args:
-    #    element (Element): An element to process
+    # The callable should fit the signature `process(element: Element) -> any`.
     #
-    # Returns:
-    #    (any): An optional something to be returned
-    #           for every element successfully processed
+    # Note that the callable may be executed in a child process, so the return
+    # value should be a simple object (must be pickle-able, i.e. strings,
+    # lists, dicts, numbers, but not Element instances). This is sent to back
+    # to the main process.
     #
+    # This method is the only way for a queue to affect elements, and so is
+    # not optional to implement.
     #
-    def process(self, element):
-        pass
+    # Returns:
+    #    (Callable[[Element], Any]): The callable for processing elements.
+    #
+    def get_process_func(self):
+        raise NotImplementedError()
 
     # status()
     #
@@ -218,7 +223,7 @@ class Queue():
             ElementJob(self._scheduler, self.action_name,
                        self._element_log_path(element),
                        element=element, queue=self,
-                       action_cb=self.process,
+                       action_cb=self.get_process_func(),
                        complete_cb=self._job_done,
                        max_retries=self._max_retries)
             for element in ready
diff --git a/src/buildstream/_scheduler/queues/sourcepushqueue.py b/src/buildstream/_scheduler/queues/sourcepushqueue.py
index c38460e..92587d6 100644
--- a/src/buildstream/_scheduler/queues/sourcepushqueue.py
+++ b/src/buildstream/_scheduler/queues/sourcepushqueue.py
@@ -30,13 +30,16 @@ class SourcePushQueue(Queue):
     complete_name = "Sources pushed"
     resources = [ResourceType.UPLOAD]
 
-    def process(self, element):
-        # Returns whether a source was pushed or not
-        if not element._source_push():
-            raise SkipJob(self.action_name)
+    def get_process_func(self):
+        return _raise_skip_if_not_pushed
 
     def status(self, element):
         if element._skip_source_push():
             return QueueStatus.SKIP
 
         return QueueStatus.READY
+
+
+def _raise_skip_if_not_pushed(element):
+    if not element._source_push():
+        raise SkipJob(SourcePushQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py
index 194bb7e..56fa9c5 100644
--- a/src/buildstream/_scheduler/queues/trackqueue.py
+++ b/src/buildstream/_scheduler/queues/trackqueue.py
@@ -35,8 +35,8 @@ class TrackQueue(Queue):
     complete_name = "Tracked"
     resources = [ResourceType.DOWNLOAD]
 
-    def process(self, element):
-        return element._track()
+    def get_process_func(self, element):
+        return _track_element
 
     def status(self, element):
         # We can skip elements entirely if they have no sources.
@@ -60,3 +60,7 @@ class TrackQueue(Queue):
             source._set_ref(new_ref, save=True)
 
         element._tracking_done()
+
+
+def _track_element(element):
+    return element._track()