You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:50:01 UTC
[buildstream] branch tpollard/notificationhandlertmp created (now
0106e1b)
This is an automated email from the ASF dual-hosted git repository.
root pushed a change to branch tpollard/notificationhandlertmp
in repository https://gitbox.apache.org/repos/asf/buildstream.git.
at 0106e1b switching to queue
This branch includes the following new commits:
new 7f7c55f WIP: Refactor scheduler-frontend communication
new 8f844bc Add workaround for buildqueue job_complete direct callback
new c6420d4 Refer to stream-scheduler communication as notifications
new 998fd07 fixups
new 0106e1b switching to queue
The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[buildstream] 01/05: WIP: Refactor scheduler-frontend communication
Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
root pushed a commit to branch tpollard/notificationhandlertmp
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 7f7c55f040b8bc42c5c66f0cbbb173d09bef856e
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jun 13 17:47:04 2019 +0100
WIP: Refactor scheduler-frontend communication
---
src/buildstream/_frontend/app.py | 2 +
src/buildstream/_scheduler/__init__.py | 2 +-
src/buildstream/_scheduler/scheduler.py | 72 +++++++++++++++++++++++++--------
src/buildstream/_stream.py | 25 +++++++++++-
4 files changed, 81 insertions(+), 20 deletions(-)
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 87575b6..7fe71c5 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -603,6 +603,7 @@ class App():
pass
return
+ assert False
# Interactive mode for element failures
with self._interrupted():
@@ -642,6 +643,7 @@ class App():
# Handle choices which you can come back from
#
+ assert choice != 'shell' # This won't work for now
if choice == 'shell':
click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True)
try:
diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py
index d2f458f..d689d6e 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -26,5 +26,5 @@ from .queues.buildqueue import BuildQueue
from .queues.artifactpushqueue import ArtifactPushQueue
from .queues.pullqueue import PullQueue
-from .scheduler import Scheduler, SchedStatus
+from .scheduler import Scheduler, SchedStatus, Notification, NotificationType
from .jobs import ElementJob, JobStatus
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 9d7cf5d..0ed5ada 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -21,6 +21,7 @@
# System imports
import os
import asyncio
+import enum
from itertools import chain
import signal
import datetime
@@ -45,6 +46,32 @@ _ACTION_NAME_CLEANUP = 'clean'
_ACTION_NAME_CACHE_SIZE = 'size'
+@enum.unique
+class NotificationType(enum.Enum):
+ INTERRUPT = "interrupt"
+ JOB_START = "job_start"
+ JOB_COMPLETE = "job_complete"
+ TICK = "tick"
+
+
+class Notification:
+
+ def __init__(self,
+ notification_type,
+ *,
+ full_name=None,
+ job_action=None,
+ job_status=None,
+ elapsed_time=None,
+ element=None):
+ self.notification_type = notification_type
+ self.full_name = full_name
+ self.job_action = job_action
+ self.job_status = job_status
+ self.elapsed_time = elapsed_time
+ self.element = element
+
+
# Scheduler()
#
# The scheduler operates on a list queues, each of which is meant to accomplish
@@ -68,7 +95,7 @@ _ACTION_NAME_CACHE_SIZE = 'size'
class Scheduler():
def __init__(self, context,
- start_time, state,
+ start_time, state, message_handler,
interrupt_callback=None,
ticker_callback=None):
@@ -99,9 +126,17 @@ class Scheduler():
self._cleanup_scheduled = False # Whether we have a cleanup job scheduled
self._cleanup_running = None # A running CleanupJob, or None
- # Callbacks to report back to the Scheduler owner
- self._interrupt_callback = interrupt_callback
- self._ticker_callback = ticker_callback
+ # Callback to send messages to report back to the Scheduler's owner
+ self.message = message_handler
+
+ # Whether our exclusive jobs, like 'cleanup' are currently already
+ # waiting or active.
+ #
+ # This is just a bit quicker than scanning the wait queue and active
+ # queue and comparing job action names.
+ #
+ self._exclusive_waiting = set()
+ self._exclusive_active = set()
self.resources = Resources(context.sched_builders,
context.sched_fetchers,
@@ -131,8 +166,7 @@ class Scheduler():
asyncio.set_event_loop(self.loop)
# Add timeouts
- if self._ticker_callback:
- self.loop.call_later(1, self._tick)
+ self.loop.call_later(1, self._tick)
# Handle unix signals while running
self._connect_signals()
@@ -251,13 +285,17 @@ class Scheduler():
# Remove from the active jobs list
self._active_jobs.remove(job)
- self._state.remove_task(job.action_name, job.name)
if status == JobStatus.FAIL:
# If it's an elementjob, we want to compare against the failure messages
# and send the Element() instance. Note this will change if the frontend
# is run in a separate process for pickling
element = job.get_element()
- self._state.fail_task(job.action_name, job.name, element=element)
+ message = Notification(NotificationType.JOB_COMPLETE,
+ full_name=job.name,
+ job_action=job.action_name,
+ job_status=status,
+ element=element)
+ self.message(message)
# Now check for more jobs
self._sched()
@@ -316,7 +354,11 @@ class Scheduler():
#
def _start_job(self, job):
self._active_jobs.append(job)
- self._state.add_task(job.action_name, job.name, self.elapsed_time())
+ message = Notification(NotificationType.JOB_START,
+ full_name=job.name,
+ job_action=job.action_name,
+ elapsed_time=self.elapsed_time())
+ self.message(message)
job.start()
# Callback for the cache size job
@@ -535,13 +577,8 @@ class Scheduler():
if self.terminated:
return
- # Leave this to the frontend to decide, if no
- # interrrupt callback was specified, then just terminate.
- if self._interrupt_callback:
- self._interrupt_callback()
- else:
- # Default without a frontend is just terminate
- self.terminate_jobs()
+ message = Notification(NotificationType.INTERRUPT)
+ self.message(message)
# _terminate_event():
#
@@ -600,7 +637,8 @@ class Scheduler():
# Regular timeout for driving status in the UI
def _tick(self):
- self._ticker_callback()
+ message = Notification(NotificationType.TICK)
+ self.message(message)
self.loop.call_later(1, self._tick)
def __getstate__(self):
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c54fee1..b09521b 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -36,7 +36,7 @@ from ._artifactelement import verify_artifact_ref, ArtifactElement
from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
from ._message import Message, MessageType
from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
- SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue
+ SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus
from ._pipeline import Pipeline, PipelineSelection
from ._profile import Topics, PROFILER
from ._state import State
@@ -83,11 +83,13 @@ class Stream():
context.messenger.set_state(self._state)
- self._scheduler = Scheduler(context, session_start, self._state,
+ self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler,
interrupt_callback=interrupt_callback,
ticker_callback=ticker_callback)
self._first_non_track_queue = None
self._session_start_callback = session_start_callback
+ self._ticker_callback = ticker_callback
+ self._interrupt_callback = interrupt_callback
# init()
#
@@ -1584,6 +1586,25 @@ class Stream():
return element_targets, artifact_refs
+ def _scheduler_notification_handler(self, notification):
+ if notification.notification_type == NotificationType.INTERRUPT:
+ self._interrupt_callback()
+ elif notification.notification_type == NotificationType.TICK:
+ self._ticker_callback()
+ elif notification.notification_type == NotificationType.JOB_START:
+ self._state.add_task(notification.job_action, notification.full_name, notification.elapsed_time)
+
+ elif notification.notification_type == NotificationType.JOB_COMPLETE:
+ self._state.remove_task(notification.job_action, notification.full_name)
+ if notification.job_status == JobStatus.FAIL:
+ if notification.element:
+ unique_id = notification.full_name
+ else:
+ unique_id = None
+ self._state.fail_task(notification.job_action, notification.full_name, unique_id)
+ else:
+ raise StreamError("Unreccognised notification type recieved")
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and
[buildstream] 05/05: switching to queue
Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
root pushed a commit to branch tpollard/notificationhandlertmp
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 0106e1bfe9d11df7690078bf454519e73407a3e2
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Mon Aug 12 15:04:57 2019 +0100
switching to queue
---
src/buildstream/_scheduler/scheduler.py | 26 +++++++++++++++++++-------
src/buildstream/_stream.py | 17 +++++++++++++++--
2 files changed, 34 insertions(+), 9 deletions(-)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index e2a180c..77eca25 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -26,6 +26,7 @@ from itertools import chain
import signal
import datetime
from contextlib import contextmanager
+import time
# Local imports
from .resources import Resources, ResourceType
@@ -95,7 +96,7 @@ class Notification:
class Scheduler():
def __init__(self, context,
- start_time, state, notification_handler,
+ start_time, state, notification_queue, notifier,
interrupt_callback=None,
ticker_callback=None):
@@ -126,8 +127,9 @@ class Scheduler():
self._cleanup_scheduled = False # Whether we have a cleanup job scheduled
self._cleanup_running = None # A running CleanupJob, or None
- # Callback to send notifications to report back to the Scheduler's owner
- self.notify = notification_handler
+ # Message to send notifications back to the Scheduler's owner
+ self._notification_queue = notification_queue
+ self._notifier = notifier
# Whether our exclusive jobs, like 'cleanup' are currently already
# waiting or active.
@@ -294,7 +296,7 @@ class Scheduler():
job_action=job.action_name,
job_status=status,
element=element)
- self.notify(notification)
+ self._notify(notification)
self._sched()
# check_cache_size():
@@ -355,7 +357,7 @@ class Scheduler():
full_name=job.name,
job_action=job.action_name,
elapsed_time=self.elapsed_time())
- self.notify(notification)
+ self._notify(notification)
job.start()
# Callback for the cache size job
@@ -575,7 +577,7 @@ class Scheduler():
return
notification = Notification(NotificationType.INTERRUPT)
- self.notify(notification)
+ self._notify(notification)
# _terminate_event():
#
@@ -635,9 +637,19 @@ class Scheduler():
# Regular timeout for driving status in the UI
def _tick(self):
notification = Notification(NotificationType.TICK)
- self.notify(notification)
+ self._notify(notification)
self.loop.call_later(1, self._tick)
+ def _notify(self, notification):
+ self._notification_queue.put(notification)
+ x = 0
+ while self._notification_queue.empty():
+ time.sleep(0.1)
+ x = x +1
+ if x == 10:
+ raise ValueError("queue still empty")
+ self._notifier()
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c97cf28..ee5a48b 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -28,8 +28,10 @@ import shlex
import shutil
import tarfile
import tempfile
+import multiprocessing as mp
from contextlib import contextmanager, suppress
from fnmatch import fnmatch
+import queue
from ._artifact import Artifact
from ._artifactelement import verify_artifact_ref, ArtifactElement
@@ -80,10 +82,12 @@ class Stream():
self._project = None
self._pipeline = None
self._state = State(session_start) # Owned by Stream, used by Core to set state
+ self._notification_queue = mp.Queue()
context.messenger.set_state(self._state)
- self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler,
+ self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue,
+ self._scheduler_notification_handler,
interrupt_callback=interrupt_callback,
ticker_callback=ticker_callback)
self._first_non_track_queue = None
@@ -1586,7 +1590,16 @@ class Stream():
return element_targets, artifact_refs
- def _scheduler_notification_handler(self, notification):
+ def _scheduler_notification_handler(self):
+ # Check the queue is there and a scheduler is running
+ assert self._notification_queue
+ notification = None
+ #while notification is None:
+ #try:
+ notification = self._notification_queue.get_nowait()
+ #except queue.Empty:
+ # pass
+
if notification.notification_type == NotificationType.INTERRUPT:
self._interrupt_callback()
elif notification.notification_type == NotificationType.TICK:
[buildstream] 02/05: Add workaround for buildqueue job_complete
direct callback
Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
root pushed a commit to branch tpollard/notificationhandlertmp
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 8f844bc3dc2ccb32d308a29e78305e76c70e6818
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Jun 18 12:34:22 2019 +0100
Add workaround for buildqueue job_complete direct callback
---
src/buildstream/_scheduler/scheduler.py | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 0ed5ada..c659452 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -279,11 +279,14 @@ class Scheduler():
# queue (Queue): The Queue holding a complete job
# job (Job): The completed Job
# status (JobStatus): The status of the completed job
+ # process_jobs (bool): If the scheduler should also process the
+ # job, else just generate the notification
#
- def job_completed(self, job, status):
+ def job_completed(self, job, status, process_jobs=True):
- # Remove from the active jobs list
- self._active_jobs.remove(job)
+ if process_jobs:
+ # Remove from the active jobs list
+ self._active_jobs.remove(job)
if status == JobStatus.FAIL:
# If it's an elementjob, we want to compare against the failure messages
@@ -297,8 +300,9 @@ class Scheduler():
element=element)
self.message(message)
- # Now check for more jobs
- self._sched()
+ if process_jobs:
+ # Now check for more jobs
+ self._sched()
# check_cache_size():
#
[buildstream] 04/05: fixups
Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
root pushed a commit to branch tpollard/notificationhandlertmp
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 998fd07484c609e325ea9d533c7a2358cbe8d41a
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Aug 6 16:28:39 2019 +0100
fixups
---
src/buildstream/_scheduler/scheduler.py | 27 ++++++++++-----------------
src/buildstream/_stream.py | 7 ++-----
2 files changed, 12 insertions(+), 22 deletions(-)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 2820c28..e2a180c 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -279,30 +279,23 @@ class Scheduler():
# queue (Queue): The Queue holding a complete job
# job (Job): The completed Job
# status (JobStatus): The status of the completed job
- # process_jobs (bool): If the scheduler should also process the
- # job, else just generate the notification
#
- def job_completed(self, job, status, process_jobs=True):
-
- if process_jobs:
- # Remove from the active jobs list
- self._active_jobs.remove(job)
+ def job_completed(self, job, status):
+ self._active_jobs.remove(job)
+ element = None
if status == JobStatus.FAIL:
# If it's an elementjob, we want to compare against the failure messages
# and send the Element() instance. Note this will change if the frontend
# is run in a separate process for pickling
element = job.get_element()
- message = Notification(NotificationType.JOB_COMPLETE,
- full_name=job.name,
- job_action=job.action_name,
- job_status=status,
- element=element)
- self.message(message)
-
- if process_jobs:
- # Now check for more jobs
- self._sched()
+ notification = Notification(NotificationType.JOB_COMPLETE,
+ full_name=job.name,
+ job_action=job.action_name,
+ job_status=status,
+ element=element)
+ self.notify(notification)
+ self._sched()
# check_cache_size():
#
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index b09521b..c97cf28 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1597,11 +1597,8 @@ class Stream():
elif notification.notification_type == NotificationType.JOB_COMPLETE:
self._state.remove_task(notification.job_action, notification.full_name)
if notification.job_status == JobStatus.FAIL:
- if notification.element:
- unique_id = notification.full_name
- else:
- unique_id = None
- self._state.fail_task(notification.job_action, notification.full_name, unique_id)
+ self._state.fail_task(notification.job_action, notification.full_name,
+ notification.element)
else:
raise StreamError("Unreccognised notification type recieved")
[buildstream] 03/05: Refer to stream-scheduler communication as
notifications
Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
root pushed a commit to branch tpollard/notificationhandlertmp
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit c6420d4d5923efd5a6f592ca48cda728bffc4c64
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Jun 18 13:02:15 2019 +0100
Refer to stream-scheduler communication as notifications
---
src/buildstream/_scheduler/scheduler.py | 28 ++++++++++++++--------------
1 file changed, 14 insertions(+), 14 deletions(-)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index c659452..2820c28 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -95,7 +95,7 @@ class Notification:
class Scheduler():
def __init__(self, context,
- start_time, state, message_handler,
+ start_time, state, notification_handler,
interrupt_callback=None,
ticker_callback=None):
@@ -126,8 +126,8 @@ class Scheduler():
self._cleanup_scheduled = False # Whether we have a cleanup job scheduled
self._cleanup_running = None # A running CleanupJob, or None
- # Callback to send messages to report back to the Scheduler's owner
- self.message = message_handler
+ # Callback to send notifications to report back to the Scheduler's owner
+ self.notify = notification_handler
# Whether our exclusive jobs, like 'cleanup' are currently already
# waiting or active.
@@ -293,12 +293,12 @@ class Scheduler():
# and send the Element() instance. Note this will change if the frontend
# is run in a separate process for pickling
element = job.get_element()
- message = Notification(NotificationType.JOB_COMPLETE,
+ message = Notification(NotificationType.JOB_COMPLETE,
full_name=job.name,
job_action=job.action_name,
job_status=status,
element=element)
- self.message(message)
+ self.message(message)
if process_jobs:
# Now check for more jobs
@@ -358,11 +358,11 @@ class Scheduler():
#
def _start_job(self, job):
self._active_jobs.append(job)
- message = Notification(NotificationType.JOB_START,
- full_name=job.name,
- job_action=job.action_name,
- elapsed_time=self.elapsed_time())
- self.message(message)
+ notification = Notification(NotificationType.JOB_START,
+ full_name=job.name,
+ job_action=job.action_name,
+ elapsed_time=self.elapsed_time())
+ self.notify(notification)
job.start()
# Callback for the cache size job
@@ -581,8 +581,8 @@ class Scheduler():
if self.terminated:
return
- message = Notification(NotificationType.INTERRUPT)
- self.message(message)
+ notification = Notification(NotificationType.INTERRUPT)
+ self.notify(notification)
# _terminate_event():
#
@@ -641,8 +641,8 @@ class Scheduler():
# Regular timeout for driving status in the UI
def _tick(self):
- message = Notification(NotificationType.TICK)
- self.message(message)
+ notification = Notification(NotificationType.TICK)
+ self.notify(notification)
self.loop.call_later(1, self._tick)
def __getstate__(self):