You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 08:20:56 UTC
[buildstream] 26/33: WIP: perf: don't bind entire queue to callback
This is an automated email from the ASF dual-hosted git repository.
tvb pushed a commit to branch aevri/picklable_jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit fe0540800de89ab8d0de20c563a8134c2eae32b6
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Tue Apr 9 13:31:39 2019 +0100
WIP: perf: don't bind entire queue to callback
---
src/buildstream/_scheduler/queues/artifactpushqueue.py | 11 +++++++----
src/buildstream/_scheduler/queues/buildqueue.py | 10 +++++++---
src/buildstream/_scheduler/queues/fetchqueue.py | 15 +++++++++++++--
src/buildstream/_scheduler/queues/pullqueue.py | 11 +++++++----
src/buildstream/_scheduler/queues/queue.py | 18 +++---------------
src/buildstream/_scheduler/queues/sourcepushqueue.py | 11 +++++++----
src/buildstream/_scheduler/queues/trackqueue.py | 8 ++++++--
7 files changed, 50 insertions(+), 34 deletions(-)
diff --git a/src/buildstream/_scheduler/queues/artifactpushqueue.py b/src/buildstream/_scheduler/queues/artifactpushqueue.py
index 5e9a7e8..dc6300e 100644
--- a/src/buildstream/_scheduler/queues/artifactpushqueue.py
+++ b/src/buildstream/_scheduler/queues/artifactpushqueue.py
@@ -38,13 +38,16 @@ class ArtifactPushQueue(Queue):
del state['_scheduler']
return state
- def process(self, element):
- # returns whether an artifact was uploaded or not
- if not element._push():
- raise SkipJob(self.action_name)
+ def get_process_func(self):
+ return _raise_skip_if_not_pushed
def status(self, element):
if element._skip_push():
return QueueStatus.SKIP
return QueueStatus.READY
+
+
+def _raise_skip_if_not_pushed(element):
+ if not element._push():
+ raise SkipJob(ArtifactPushQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index 34394fa..5336416 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -63,7 +63,7 @@ class BuildQueue(Queue):
logfile=logfile)
job = ElementJob(self._scheduler, self.action_name,
logfile, element=element, queue=self,
- action_cb=self.process,
+ action_cb=self.get_process_func(),
complete_cb=self._job_done,
max_retries=self._max_retries)
self._done_queue.append(element)
@@ -72,8 +72,8 @@ class BuildQueue(Queue):
return super().enqueue(to_queue)
- def process(self, element):
- return element._assemble()
+ def get_process_func(self):
+ return _assemble_element
def status(self, element):
if not element._is_required():
@@ -122,3 +122,7 @@ class BuildQueue(Queue):
# if status == JobStatus.OK:
# self._check_cache_size(job, element, result)
+
+
+def _assemble_element(element):
+ return element._assemble()
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index 50ad2b8..7ee962f 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -47,8 +47,11 @@ class FetchQueue(Queue):
self._skip_cached = skip_cached
self._fetch_original = fetch_original
- def process(self, element):
- element._fetch(fetch_original=self._fetch_original)
+ def get_process_func(self):
+ if self._fetch_original:
+ return _fetch_original
+ else:
+ return _fetch_no_original
def status(self, element):
if not element._is_required():
@@ -84,3 +87,11 @@ class FetchQueue(Queue):
assert element._get_consistency() == Consistency.CACHED
else:
assert element._source_cached()
+
+
+def _fetch_no_original(element):
+ element._fetch(fetch_original=False)
+
+
+def _fetch_original(element):
+ element._fetch(fetch_original=True)
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index 013ee64..c54c4f5 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -33,10 +33,8 @@ class PullQueue(Queue):
complete_name = "Pulled"
resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
- def process(self, element):
- # returns whether an artifact was downloaded or not
- if not element._pull():
- raise SkipJob(self.action_name)
+ def get_process_func(self):
+ return _raise_skip_if_not_pulled
def status(self, element):
if not element._is_required():
@@ -64,3 +62,8 @@ class PullQueue(Queue):
# actually check the cache size.
if status == JobStatus.OK:
self._scheduler.check_cache_size()
+
+
+def _raise_skip_if_not_pulled(element):
+ if not element._pull():
+ raise SkipJob(PullQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 1efcffc..cb2ea82 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -88,20 +88,8 @@ class Queue():
# Abstract Methods for Queue implementations #
#####################################################
- # process()
- #
- # Abstract method for processing an element
- #
- # Args:
- # element (Element): An element to process
- #
- # Returns:
- # (any): An optional something to be returned
- # for every element successfully processed
- #
- #
- def process(self, element):
- pass
+ def get_process_func(self):
+ raise NotImplementedError()
# status()
#
@@ -215,7 +203,7 @@ class Queue():
ElementJob(self._scheduler, self.action_name,
self._element_log_path(element),
element=element, queue=self,
- action_cb=self.process,
+ action_cb=self.get_process_func(),
complete_cb=self._job_done,
max_retries=self._max_retries)
for element in ready
diff --git a/src/buildstream/_scheduler/queues/sourcepushqueue.py b/src/buildstream/_scheduler/queues/sourcepushqueue.py
index c38460e..92587d6 100644
--- a/src/buildstream/_scheduler/queues/sourcepushqueue.py
+++ b/src/buildstream/_scheduler/queues/sourcepushqueue.py
@@ -30,13 +30,16 @@ class SourcePushQueue(Queue):
complete_name = "Sources pushed"
resources = [ResourceType.UPLOAD]
- def process(self, element):
- # Returns whether a source was pushed or not
- if not element._source_push():
- raise SkipJob(self.action_name)
+ def get_process_func(self):
+ return _raise_skip_if_not_pushed
def status(self, element):
if element._skip_source_push():
return QueueStatus.SKIP
return QueueStatus.READY
+
+
+def _raise_skip_if_not_pushed(element):
+ if not element._source_push():
+ raise SkipJob(SourcePushQueue.action_name)
diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py
index 72a79a5..4048072 100644
--- a/src/buildstream/_scheduler/queues/trackqueue.py
+++ b/src/buildstream/_scheduler/queues/trackqueue.py
@@ -35,8 +35,8 @@ class TrackQueue(Queue):
complete_name = "Tracked"
resources = [ResourceType.DOWNLOAD]
- def process(self, element):
- return element._track()
+ def get_process_func(self, element):
+ return _track_element
def status(self, element):
# We can skip elements entirely if they have no sources.
@@ -60,3 +60,7 @@ class TrackQueue(Queue):
source._set_ref(new_ref, save=True)
element._tracking_done()
+
+
+def _track_element(element):
+ return element._track()