You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 08:20:56 UTC

[buildstream] 26/33: WIP: perf: don't bind entire queue to callback

This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch aevri/picklable_jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit fe0540800de89ab8d0de20c563a8134c2eae32b6
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Apr 9 13:31:39 2019 +0100

    WIP: perf: don't bind entire queue to callback
---
 src/buildstream/_scheduler/queues/artifactpushqueue.py | 11 +++++++----
 src/buildstream/_scheduler/queues/buildqueue.py        | 10 +++++++---
 src/buildstream/_scheduler/queues/fetchqueue.py        | 15 +++++++++++++--
 src/buildstream/_scheduler/queues/pullqueue.py         | 11 +++++++----
 src/buildstream/_scheduler/queues/queue.py             | 18 +++---------------
 src/buildstream/_scheduler/queues/sourcepushqueue.py   | 11 +++++++----
 src/buildstream/_scheduler/queues/trackqueue.py        |  8 ++++++--
 7 files changed, 50 insertions(+), 34 deletions(-)

diff --git a/src/buildstream/_scheduler/queues/artifactpushqueue.py b/src/buildstream/_scheduler/queues/artifactpushqueue.py
index 5e9a7e8..dc6300e 100644
--- a/src/buildstream/_scheduler/queues/artifactpushqueue.py
+++ b/src/buildstream/_scheduler/queues/artifactpushqueue.py
@@ -38,13 +38,16 @@ class ArtifactPushQueue(Queue):
         del state['_scheduler']
         return state
 
-    def process(self, element):
-        # returns whether an artifact was uploaded or not
-        if not element._push():
-            raise SkipJob(self.action_name)
+    def get_process_func(self):
+        return _raise_skip_if_not_pushed
 
     def status(self, element):
         if element._skip_push():
             return QueueStatus.SKIP
 
         return QueueStatus.READY
+
+
+def _raise_skip_if_not_pushed(element):
+    if not element._push():
+        raise SkipJob(ArtifactPushQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index 34394fa..5336416 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -63,7 +63,7 @@ class BuildQueue(Queue):
                           logfile=logfile)
             job = ElementJob(self._scheduler, self.action_name,
                              logfile, element=element, queue=self,
-                             action_cb=self.process,
+                             action_cb=self.get_process_func(),
                              complete_cb=self._job_done,
                              max_retries=self._max_retries)
             self._done_queue.append(element)
@@ -72,8 +72,8 @@ class BuildQueue(Queue):
 
         return super().enqueue(to_queue)
 
-    def process(self, element):
-        return element._assemble()
+    def get_process_func(self):
+        return _assemble_element
 
     def status(self, element):
         if not element._is_required():
@@ -122,3 +122,7 @@ class BuildQueue(Queue):
 
         # if status == JobStatus.OK:
         #     self._check_cache_size(job, element, result)
+
+
+def _assemble_element(element):
+    return element._assemble()
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index 50ad2b8..7ee962f 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -47,8 +47,11 @@ class FetchQueue(Queue):
         self._skip_cached = skip_cached
         self._fetch_original = fetch_original
 
-    def process(self, element):
-        element._fetch(fetch_original=self._fetch_original)
+    def get_process_func(self):
+        if self._fetch_original:
+            return _fetch_original
+        else:
+            return _fetch_no_original
 
     def status(self, element):
         if not element._is_required():
@@ -84,3 +87,11 @@ class FetchQueue(Queue):
             assert element._get_consistency() == Consistency.CACHED
         else:
             assert element._source_cached()
+
+
+def _fetch_no_original(element):
+    element._fetch(fetch_original=False)
+
+
+def _fetch_original(element):
+    element._fetch(fetch_original=True)
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index 013ee64..c54c4f5 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -33,10 +33,8 @@ class PullQueue(Queue):
     complete_name = "Pulled"
     resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
 
-    def process(self, element):
-        # returns whether an artifact was downloaded or not
-        if not element._pull():
-            raise SkipJob(self.action_name)
+    def get_process_func(self):
+        return _raise_skip_if_not_pulled
 
     def status(self, element):
         if not element._is_required():
@@ -64,3 +62,8 @@ class PullQueue(Queue):
         # actually check the cache size.
         if status == JobStatus.OK:
             self._scheduler.check_cache_size()
+
+
+def _raise_skip_if_not_pulled(element):
+    if not element._pull():
+        raise SkipJob(PullQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 1efcffc..cb2ea82 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -88,20 +88,8 @@ class Queue():
     #     Abstract Methods for Queue implementations    #
     #####################################################
 
-    # process()
-    #
-    # Abstract method for processing an element
-    #
-    # Args:
-    #    element (Element): An element to process
-    #
-    # Returns:
-    #    (any): An optional something to be returned
-    #           for every element successfully processed
-    #
-    #
-    def process(self, element):
-        pass
+    def get_process_func(self):
+        raise NotImplementedError()
 
     # status()
     #
@@ -215,7 +203,7 @@ class Queue():
             ElementJob(self._scheduler, self.action_name,
                        self._element_log_path(element),
                        element=element, queue=self,
-                       action_cb=self.process,
+                       action_cb=self.get_process_func(),
                        complete_cb=self._job_done,
                        max_retries=self._max_retries)
             for element in ready
diff --git a/src/buildstream/_scheduler/queues/sourcepushqueue.py b/src/buildstream/_scheduler/queues/sourcepushqueue.py
index c38460e..92587d6 100644
--- a/src/buildstream/_scheduler/queues/sourcepushqueue.py
+++ b/src/buildstream/_scheduler/queues/sourcepushqueue.py
@@ -30,13 +30,16 @@ class SourcePushQueue(Queue):
     complete_name = "Sources pushed"
     resources = [ResourceType.UPLOAD]
 
-    def process(self, element):
-        # Returns whether a source was pushed or not
-        if not element._source_push():
-            raise SkipJob(self.action_name)
+    def get_process_func(self):
+        return _raise_skip_if_not_pushed
 
     def status(self, element):
         if element._skip_source_push():
             return QueueStatus.SKIP
 
         return QueueStatus.READY
+
+
+def _raise_skip_if_not_pushed(element):
+    if not element._source_push():
+        raise SkipJob(SourcePushQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py
index 72a79a5..4048072 100644
--- a/src/buildstream/_scheduler/queues/trackqueue.py
+++ b/src/buildstream/_scheduler/queues/trackqueue.py
@@ -35,8 +35,8 @@ class TrackQueue(Queue):
     complete_name = "Tracked"
     resources = [ResourceType.DOWNLOAD]
 
-    def process(self, element):
-        return element._track()
+    def get_process_func(self, element):
+        return _track_element
 
     def status(self, element):
         # We can skip elements entirely if they have no sources.
@@ -60,3 +60,7 @@ class TrackQueue(Queue):
             source._set_ref(new_ref, save=True)
 
         element._tracking_done()
+
+
+def _track_element(element):
+    return element._track()