You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:50:01 UTC

[buildstream] branch tpollard/notificationhandlertmp created (now 0106e1b)

This is an automated email from the ASF dual-hosted git repository.

root pushed a change to branch tpollard/notificationhandlertmp
in repository https://gitbox.apache.org/repos/asf/buildstream.git.


      at 0106e1b  switching to queue

This branch includes the following new commits:

     new 7f7c55f  WIP: Refactor scheduler-frontend communication
     new 8f844bc  Add workaround for buildqueue job_complete direct callback
     new c6420d4  Refer to stream-scheduler communication as notifications
     new 998fd07  fixups
     new 0106e1b  switching to queue

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[buildstream] 01/05: WIP: Refactor scheduler-frontend communication

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch tpollard/notificationhandlertmp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 7f7c55f040b8bc42c5c66f0cbbb173d09bef856e
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jun 13 17:47:04 2019 +0100

    WIP: Refactor scheduler-frontend communication
---
 src/buildstream/_frontend/app.py        |  2 +
 src/buildstream/_scheduler/__init__.py  |  2 +-
 src/buildstream/_scheduler/scheduler.py | 72 +++++++++++++++++++++++++--------
 src/buildstream/_stream.py              | 25 +++++++++++-
 4 files changed, 81 insertions(+), 20 deletions(-)

diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 87575b6..7fe71c5 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -603,6 +603,7 @@ class App():
                 pass
             return
 
+        assert False
         # Interactive mode for element failures
         with self._interrupted():
 
@@ -642,6 +643,7 @@ class App():
 
                 # Handle choices which you can come back from
                 #
+                assert choice != 'shell'  # This won't work for now
                 if choice == 'shell':
                     click.echo("\nDropping into an interactive shell in the failed build sandbox\n", err=True)
                     try:
diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py
index d2f458f..d689d6e 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -26,5 +26,5 @@ from .queues.buildqueue import BuildQueue
 from .queues.artifactpushqueue import ArtifactPushQueue
 from .queues.pullqueue import PullQueue
 
-from .scheduler import Scheduler, SchedStatus
+from .scheduler import Scheduler, SchedStatus, Notification, NotificationType
 from .jobs import ElementJob, JobStatus
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 9d7cf5d..0ed5ada 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -21,6 +21,7 @@
 # System imports
 import os
 import asyncio
+import enum
 from itertools import chain
 import signal
 import datetime
@@ -45,6 +46,32 @@ _ACTION_NAME_CLEANUP = 'clean'
 _ACTION_NAME_CACHE_SIZE = 'size'
 
 
+@enum.unique
+class NotificationType(enum.Enum):
+    INTERRUPT = "interrupt"
+    JOB_START = "job_start"
+    JOB_COMPLETE = "job_complete"
+    TICK = "tick"
+
+
+class Notification:
+
+    def __init__(self,
+                 notification_type,
+                 *,
+                 full_name=None,
+                 job_action=None,
+                 job_status=None,
+                 elapsed_time=None,
+                 element=None):
+        self.notification_type = notification_type
+        self.full_name = full_name
+        self.job_action = job_action
+        self.job_status = job_status
+        self.elapsed_time = elapsed_time
+        self.element = element
+
+
 # Scheduler()
 #
 # The scheduler operates on a list queues, each of which is meant to accomplish
@@ -68,7 +95,7 @@ _ACTION_NAME_CACHE_SIZE = 'size'
 class Scheduler():
 
     def __init__(self, context,
-                 start_time, state,
+                 start_time, state, message_handler,
                  interrupt_callback=None,
                  ticker_callback=None):
 
@@ -99,9 +126,17 @@ class Scheduler():
         self._cleanup_scheduled = False       # Whether we have a cleanup job scheduled
         self._cleanup_running = None          # A running CleanupJob, or None
 
-        # Callbacks to report back to the Scheduler owner
-        self._interrupt_callback = interrupt_callback
-        self._ticker_callback = ticker_callback
+        # Callback to send messages to report back to the Scheduler's owner
+        self.message = message_handler
+
+        # Whether our exclusive jobs, like 'cleanup' are currently already
+        # waiting or active.
+        #
+        # This is just a bit quicker than scanning the wait queue and active
+        # queue and comparing job action names.
+        #
+        self._exclusive_waiting = set()
+        self._exclusive_active = set()
 
         self.resources = Resources(context.sched_builders,
                                    context.sched_fetchers,
@@ -131,8 +166,7 @@ class Scheduler():
         asyncio.set_event_loop(self.loop)
 
         # Add timeouts
-        if self._ticker_callback:
-            self.loop.call_later(1, self._tick)
+        self.loop.call_later(1, self._tick)
 
         # Handle unix signals while running
         self._connect_signals()
@@ -251,13 +285,17 @@ class Scheduler():
         # Remove from the active jobs list
         self._active_jobs.remove(job)
 
-        self._state.remove_task(job.action_name, job.name)
         if status == JobStatus.FAIL:
             # If it's an elementjob, we want to compare against the failure messages
             # and send the Element() instance. Note this will change if the frontend
             # is run in a separate process for pickling
             element = job.get_element()
-            self._state.fail_task(job.action_name, job.name, element=element)
+            message = Notification(NotificationType.JOB_COMPLETE,
+                                   full_name=job.name,
+                                   job_action=job.action_name,
+                                   job_status=status,
+                                   element=element)
+            self.message(message)
 
         # Now check for more jobs
         self._sched()
@@ -316,7 +354,11 @@ class Scheduler():
     #
     def _start_job(self, job):
         self._active_jobs.append(job)
-        self._state.add_task(job.action_name, job.name, self.elapsed_time())
+        message = Notification(NotificationType.JOB_START,
+                               full_name=job.name,
+                               job_action=job.action_name,
+                               elapsed_time=self.elapsed_time())
+        self.message(message)
         job.start()
 
     # Callback for the cache size job
@@ -535,13 +577,8 @@ class Scheduler():
         if self.terminated:
             return
 
-        # Leave this to the frontend to decide, if no
-        # interrrupt callback was specified, then just terminate.
-        if self._interrupt_callback:
-            self._interrupt_callback()
-        else:
-            # Default without a frontend is just terminate
-            self.terminate_jobs()
+        message = Notification(NotificationType.INTERRUPT)
+        self.message(message)
 
     # _terminate_event():
     #
@@ -600,7 +637,8 @@ class Scheduler():
 
     # Regular timeout for driving status in the UI
     def _tick(self):
-        self._ticker_callback()
+        message = Notification(NotificationType.TICK)
+        self.message(message)
         self.loop.call_later(1, self._tick)
 
     def __getstate__(self):
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c54fee1..b09521b 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -36,7 +36,7 @@ from ._artifactelement import verify_artifact_ref, ArtifactElement
 from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
 from ._message import Message, MessageType
 from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
-    SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue
+    SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, JobStatus
 from ._pipeline import Pipeline, PipelineSelection
 from ._profile import Topics, PROFILER
 from ._state import State
@@ -83,11 +83,13 @@ class Stream():
 
         context.messenger.set_state(self._state)
 
-        self._scheduler = Scheduler(context, session_start, self._state,
+        self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler,
                                     interrupt_callback=interrupt_callback,
                                     ticker_callback=ticker_callback)
         self._first_non_track_queue = None
         self._session_start_callback = session_start_callback
+        self._ticker_callback = ticker_callback
+        self._interrupt_callback = interrupt_callback
 
     # init()
     #
@@ -1584,6 +1586,25 @@ class Stream():
 
         return element_targets, artifact_refs
 
+    def _scheduler_notification_handler(self, notification):
+        if notification.notification_type == NotificationType.INTERRUPT:
+            self._interrupt_callback()
+        elif notification.notification_type == NotificationType.TICK:
+            self._ticker_callback()
+        elif notification.notification_type == NotificationType.JOB_START:
+            self._state.add_task(notification.job_action, notification.full_name, notification.elapsed_time)
+
+        elif notification.notification_type == NotificationType.JOB_COMPLETE:
+            self._state.remove_task(notification.job_action, notification.full_name)
+            if notification.job_status == JobStatus.FAIL:
+                if notification.element:
+                    unique_id = notification.full_name
+                else:
+                    unique_id = None
+                self._state.fail_task(notification.job_action, notification.full_name, unique_id)
+        else:
+            raise StreamError("Unreccognised notification type recieved")
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and


[buildstream] 05/05: switching to queue

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch tpollard/notificationhandlertmp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 0106e1bfe9d11df7690078bf454519e73407a3e2
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Mon Aug 12 15:04:57 2019 +0100

    switching to queue
---
 src/buildstream/_scheduler/scheduler.py | 26 +++++++++++++++++++-------
 src/buildstream/_stream.py              | 17 +++++++++++++++--
 2 files changed, 34 insertions(+), 9 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index e2a180c..77eca25 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -26,6 +26,7 @@ from itertools import chain
 import signal
 import datetime
 from contextlib import contextmanager
+import time
 
 # Local imports
 from .resources import Resources, ResourceType
@@ -95,7 +96,7 @@ class Notification:
 class Scheduler():
 
     def __init__(self, context,
-                 start_time, state, notification_handler,
+                 start_time, state, notification_queue, notifier,
                  interrupt_callback=None,
                  ticker_callback=None):
 
@@ -126,8 +127,9 @@ class Scheduler():
         self._cleanup_scheduled = False       # Whether we have a cleanup job scheduled
         self._cleanup_running = None          # A running CleanupJob, or None
 
-        # Callback to send notifications to report back to the Scheduler's owner
-        self.notify = notification_handler
+        # Message to send notifications back to the Scheduler's owner
+        self._notification_queue = notification_queue
+        self._notifier = notifier
 
         # Whether our exclusive jobs, like 'cleanup' are currently already
         # waiting or active.
@@ -294,7 +296,7 @@ class Scheduler():
                                     job_action=job.action_name,
                                     job_status=status,
                                     element=element)
-        self.notify(notification)
+        self._notify(notification)
         self._sched()
 
     # check_cache_size():
@@ -355,7 +357,7 @@ class Scheduler():
                                     full_name=job.name,
                                     job_action=job.action_name,
                                     elapsed_time=self.elapsed_time())
-        self.notify(notification)
+        self._notify(notification)
         job.start()
 
     # Callback for the cache size job
@@ -575,7 +577,7 @@ class Scheduler():
             return
 
         notification = Notification(NotificationType.INTERRUPT)
-        self.notify(notification)
+        self._notify(notification)
 
     # _terminate_event():
     #
@@ -635,9 +637,19 @@ class Scheduler():
     # Regular timeout for driving status in the UI
     def _tick(self):
         notification = Notification(NotificationType.TICK)
-        self.notify(notification)
+        self._notify(notification)
         self.loop.call_later(1, self._tick)
 
+    def _notify(self, notification):
+        self._notification_queue.put(notification)
+        x = 0
+        while self._notification_queue.empty():
+            time.sleep(0.1)
+            x = x +1
+            if x == 10:
+                raise ValueError("queue still empty")
+        self._notifier()
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c97cf28..ee5a48b 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -28,8 +28,10 @@ import shlex
 import shutil
 import tarfile
 import tempfile
+import multiprocessing as mp
 from contextlib import contextmanager, suppress
 from fnmatch import fnmatch
+import queue
 
 from ._artifact import Artifact
 from ._artifactelement import verify_artifact_ref, ArtifactElement
@@ -80,10 +82,12 @@ class Stream():
         self._project = None
         self._pipeline = None
         self._state = State(session_start)  # Owned by Stream, used by Core to set state
+        self._notification_queue = mp.Queue()
 
         context.messenger.set_state(self._state)
 
-        self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler,
+        self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue,
+                                    self._scheduler_notification_handler,
                                     interrupt_callback=interrupt_callback,
                                     ticker_callback=ticker_callback)
         self._first_non_track_queue = None
@@ -1586,7 +1590,16 @@ class Stream():
 
         return element_targets, artifact_refs
 
-    def _scheduler_notification_handler(self, notification):
+    def _scheduler_notification_handler(self):
+        # Check the queue is there and a scheduler is running
+        assert self._notification_queue
+        notification = None
+        #while notification is None:
+            #try:
+        notification = self._notification_queue.get_nowait()
+            #except queue.Empty:
+            #    pass
+
         if notification.notification_type == NotificationType.INTERRUPT:
             self._interrupt_callback()
         elif notification.notification_type == NotificationType.TICK:


[buildstream] 02/05: Add workaround for buildqueue job_complete direct callback

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch tpollard/notificationhandlertmp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 8f844bc3dc2ccb32d308a29e78305e76c70e6818
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Jun 18 12:34:22 2019 +0100

    Add workaround for buildqueue job_complete direct callback
---
 src/buildstream/_scheduler/scheduler.py | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 0ed5ada..c659452 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -279,11 +279,14 @@ class Scheduler():
     #    queue (Queue): The Queue holding a complete job
     #    job (Job): The completed Job
     #    status (JobStatus): The status of the completed job
+    #    process_jobs (bool): If the scheduler should also process the
+    #                         job, else just generate the notification
     #
-    def job_completed(self, job, status):
+    def job_completed(self, job, status, process_jobs=True):
 
-        # Remove from the active jobs list
-        self._active_jobs.remove(job)
+        if process_jobs:
+            # Remove from the active jobs list
+            self._active_jobs.remove(job)
 
         if status == JobStatus.FAIL:
             # If it's an elementjob, we want to compare against the failure messages
@@ -297,8 +300,9 @@ class Scheduler():
                                    element=element)
             self.message(message)
 
-        # Now check for more jobs
-        self._sched()
+        if process_jobs:
+            # Now check for more jobs
+            self._sched()
 
     # check_cache_size():
     #


[buildstream] 04/05: fixups

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch tpollard/notificationhandlertmp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 998fd07484c609e325ea9d533c7a2358cbe8d41a
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Aug 6 16:28:39 2019 +0100

    fixups
---
 src/buildstream/_scheduler/scheduler.py | 27 ++++++++++-----------------
 src/buildstream/_stream.py              |  7 ++-----
 2 files changed, 12 insertions(+), 22 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 2820c28..e2a180c 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -279,30 +279,23 @@ class Scheduler():
     #    queue (Queue): The Queue holding a complete job
     #    job (Job): The completed Job
     #    status (JobStatus): The status of the completed job
-    #    process_jobs (bool): If the scheduler should also process the
-    #                         job, else just generate the notification
     #
-    def job_completed(self, job, status, process_jobs=True):
-
-        if process_jobs:
-            # Remove from the active jobs list
-            self._active_jobs.remove(job)
+    def job_completed(self, job, status):
+        self._active_jobs.remove(job)
 
+        element = None
         if status == JobStatus.FAIL:
             # If it's an elementjob, we want to compare against the failure messages
             # and send the Element() instance. Note this will change if the frontend
             # is run in a separate process for pickling
             element = job.get_element()
-        message = Notification(NotificationType.JOB_COMPLETE,
-                                   full_name=job.name,
-                                   job_action=job.action_name,
-                                   job_status=status,
-                                   element=element)
-        self.message(message)
-
-        if process_jobs:
-            # Now check for more jobs
-            self._sched()
+        notification = Notification(NotificationType.JOB_COMPLETE,
+                                    full_name=job.name,
+                                    job_action=job.action_name,
+                                    job_status=status,
+                                    element=element)
+        self.notify(notification)
+        self._sched()
 
     # check_cache_size():
     #
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index b09521b..c97cf28 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1597,11 +1597,8 @@ class Stream():
         elif notification.notification_type == NotificationType.JOB_COMPLETE:
             self._state.remove_task(notification.job_action, notification.full_name)
             if notification.job_status == JobStatus.FAIL:
-                if notification.element:
-                    unique_id = notification.full_name
-                else:
-                    unique_id = None
-                self._state.fail_task(notification.job_action, notification.full_name, unique_id)
+                self._state.fail_task(notification.job_action, notification.full_name,
+                                      notification.element)
         else:
             raise StreamError("Unreccognised notification type recieved")
 


[buildstream] 03/05: Refer to stream-scheduler communication as notifications

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch tpollard/notificationhandlertmp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit c6420d4d5923efd5a6f592ca48cda728bffc4c64
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Jun 18 13:02:15 2019 +0100

    Refer to stream-scheduler communication as notifications
---
 src/buildstream/_scheduler/scheduler.py | 28 ++++++++++++++--------------
 1 file changed, 14 insertions(+), 14 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index c659452..2820c28 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -95,7 +95,7 @@ class Notification:
 class Scheduler():
 
     def __init__(self, context,
-                 start_time, state, message_handler,
+                 start_time, state, notification_handler,
                  interrupt_callback=None,
                  ticker_callback=None):
 
@@ -126,8 +126,8 @@ class Scheduler():
         self._cleanup_scheduled = False       # Whether we have a cleanup job scheduled
         self._cleanup_running = None          # A running CleanupJob, or None
 
-        # Callback to send messages to report back to the Scheduler's owner
-        self.message = message_handler
+        # Callback to send notifications to report back to the Scheduler's owner
+        self.notify = notification_handler
 
         # Whether our exclusive jobs, like 'cleanup' are currently already
         # waiting or active.
@@ -293,12 +293,12 @@ class Scheduler():
             # and send the Element() instance. Note this will change if the frontend
             # is run in a separate process for pickling
             element = job.get_element()
-            message = Notification(NotificationType.JOB_COMPLETE,
+        message = Notification(NotificationType.JOB_COMPLETE,
                                    full_name=job.name,
                                    job_action=job.action_name,
                                    job_status=status,
                                    element=element)
-            self.message(message)
+        self.message(message)
 
         if process_jobs:
             # Now check for more jobs
@@ -358,11 +358,11 @@ class Scheduler():
     #
     def _start_job(self, job):
         self._active_jobs.append(job)
-        message = Notification(NotificationType.JOB_START,
-                               full_name=job.name,
-                               job_action=job.action_name,
-                               elapsed_time=self.elapsed_time())
-        self.message(message)
+        notification = Notification(NotificationType.JOB_START,
+                                    full_name=job.name,
+                                    job_action=job.action_name,
+                                    elapsed_time=self.elapsed_time())
+        self.notify(notification)
         job.start()
 
     # Callback for the cache size job
@@ -581,8 +581,8 @@ class Scheduler():
         if self.terminated:
             return
 
-        message = Notification(NotificationType.INTERRUPT)
-        self.message(message)
+        notification = Notification(NotificationType.INTERRUPT)
+        self.notify(notification)
 
     # _terminate_event():
     #
@@ -641,8 +641,8 @@ class Scheduler():
 
     # Regular timeout for driving status in the UI
     def _tick(self):
-        message = Notification(NotificationType.TICK)
-        self.message(message)
+        notification = Notification(NotificationType.TICK)
+        self.notify(notification)
         self.loop.call_later(1, self._tick)
 
     def __getstate__(self):