You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nuttx.apache.org by xi...@apache.org on 2021/07/28 04:01:46 UTC

[incubator-nuttx] 02/04: work_queue: schedule the work queue using the timer mechanism

This is an automated email from the ASF dual-hosted git repository.

xiaoxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nuttx.git

commit 855c78bb9dc48d4f2655257fb7ee01ff9c456cbc
Author: Jiuzhu Dong <do...@xiaomi.com>
AuthorDate: Sat Jun 19 17:29:30 2021 +0800

    work_queue: schedule the work queue using the timer mechanism
    
    Signed-off-by: Jiuzhu Dong <do...@xiaomi.com>
---
 include/nuttx/wqueue.h       |  35 ++----
 sched/wqueue/Make.defs       |   3 +-
 sched/wqueue/kwork_cancel.c  |  17 +--
 sched/wqueue/kwork_process.c | 271 -------------------------------------------
 sched/wqueue/kwork_queue.c   | 130 ++++++++++-----------
 sched/wqueue/kwork_signal.c  | 111 ------------------
 sched/wqueue/kwork_thread.c  | 103 ++++++++++++----
 sched/wqueue/wqueue.h        |  34 ++----
 8 files changed, 174 insertions(+), 530 deletions(-)

diff --git a/include/nuttx/wqueue.h b/include/nuttx/wqueue.h
index e78c46c..5a2d221 100644
--- a/include/nuttx/wqueue.h
+++ b/include/nuttx/wqueue.h
@@ -32,6 +32,7 @@
 #include <queue.h>
 
 #include <nuttx/clock.h>
+#include <nuttx/wdog.h>
 
 /****************************************************************************
  * Pre-processor Definitions
@@ -244,11 +245,17 @@ typedef CODE void (*worker_t)(FAR void *arg);
 
 struct work_s
 {
-  struct dq_entry_s dq;  /* Implements a doubly linked list */
-  worker_t  worker;      /* Work callback */
-  FAR void *arg;         /* Callback argument */
-  clock_t qtime;         /* Time work queued */
-  clock_t delay;         /* Delay until work performed */
+  union
+  {
+    struct
+    {
+      struct sq_entry_s sq; /* Implements a single linked list */
+      clock_t qtime;        /* Time work queued */
+    } s;
+    struct wdog_s timer;    /* Delay expiry timer */
+  } u;
+  worker_t  worker;         /* Work callback */
+  FAR void *arg;            /* Callback argument */
 };
 
 /* This is an enumeration of the various events that may be
@@ -375,24 +382,6 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker,
 int work_cancel(int qid, FAR struct work_s *work);
 
 /****************************************************************************
- * Name: work_signal
- *
- * Description:
- *   Signal the worker thread to process the work queue now.  This function
- *   is used internally by the work logic but could also be used by the
- *   user to force an immediate re-assessment of pending work.
- *
- * Input Parameters:
- *   qid    - The work queue ID
- *
- * Returned Value:
- *   Zero on success, a negated errno on failure
- *
- ****************************************************************************/
-
-int work_signal(int qid);
-
-/****************************************************************************
  * Name: work_available
  *
  * Description:
diff --git a/sched/wqueue/Make.defs b/sched/wqueue/Make.defs
index bc31d53..52fc34b 100644
--- a/sched/wqueue/Make.defs
+++ b/sched/wqueue/Make.defs
@@ -22,8 +22,7 @@
 
 ifeq ($(CONFIG_SCHED_WORKQUEUE),y)
 
-CSRCS += kwork_queue.c kwork_process.c kwork_cancel.c kwork_signal.c
-CSRCS += kwork_thread.c
+CSRCS += kwork_queue.c kwork_thread.c kwork_cancel.c
 
 ifeq ($(CONFIG_PRIORITY_INHERITANCE),y)
 CSRCS += kwork_inherit.c
diff --git a/sched/wqueue/kwork_cancel.c b/sched/wqueue/kwork_cancel.c
index 69886b9..b5936b5 100644
--- a/sched/wqueue/kwork_cancel.c
+++ b/sched/wqueue/kwork_cancel.c
@@ -77,18 +77,19 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue,
   flags = enter_critical_section();
   if (work->worker != NULL)
     {
-      /* A little test of the integrity of the work queue */
-
-      DEBUGASSERT(work->dq.flink != NULL ||
-                  (FAR dq_entry_t *)work == wqueue->q.tail);
-      DEBUGASSERT(work->dq.blink != NULL ||
-                  (FAR dq_entry_t *)work == wqueue->q.head);
-
       /* Remove the entry from the work queue and make sure that it is
        * marked as available (i.e., the worker field is nullified).
        */
 
-      dq_rem((FAR dq_entry_t *)work, &wqueue->q);
+      if (WDOG_ISACTIVE(&work->u.timer))
+        {
+          wd_cancel(&work->u.timer);
+        }
+      else
+        {
+          sq_rem((FAR sq_entry_t *)work, &wqueue->q);
+        }
+
       work->worker = NULL;
       ret = OK;
     }
diff --git a/sched/wqueue/kwork_process.c b/sched/wqueue/kwork_process.c
deleted file mode 100644
index 9bec97e..0000000
--- a/sched/wqueue/kwork_process.c
+++ /dev/null
@@ -1,271 +0,0 @@
-/****************************************************************************
- * sched/wqueue/kwork_process.c
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The
- * ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
- * License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
- * License for the specific language governing permissions and limitations
- * under the License.
- *
- ****************************************************************************/
-
-/****************************************************************************
- * Included Files
- ****************************************************************************/
-
-#include <nuttx/config.h>
-
-#include <debug.h>
-#include <stdint.h>
-#include <unistd.h>
-#include <signal.h>
-#include <assert.h>
-#include <queue.h>
-
-#include <nuttx/irq.h>
-#include <nuttx/clock.h>
-#include <nuttx/signal.h>
-#include <nuttx/wqueue.h>
-
-#include "wqueue/wqueue.h"
-
-#ifdef CONFIG_SCHED_WORKQUEUE
-
-/****************************************************************************
- * Pre-processor Definitions
- ****************************************************************************/
-
-/* Use CLOCK_MONOTONIC if it is available.  CLOCK_REALTIME can cause bad
- * delays if the time is changed.
- */
-
-#ifdef CONFIG_CLOCK_MONOTONIC
-#  define WORK_CLOCK CLOCK_MONOTONIC
-#else
-#  define WORK_CLOCK CLOCK_REALTIME
-#endif
-
-#ifdef CONFIG_SYSTEM_TIME64
-#  define WORK_DELAY_MAX UINT64_MAX
-#else
-#  define WORK_DELAY_MAX UINT32_MAX
-#endif
-
-#ifndef MIN
-#  define MIN(a,b) ((a) < (b) ? (a) : (b))
-#endif
-
-#ifndef CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE
-#  define CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE 0
-#endif
-
-#if CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE > 0
-#  define CALL_WORKER(worker, arg) \
-     do \
-       { \
-         uint32_t start; \
-         uint32_t elapsed; \
-         start = up_critmon_gettime(); \
-         worker(arg); \
-         elapsed = up_critmon_gettime() - start; \
-         if (elapsed > CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE) \
-           { \
-             serr("WORKER %p execute too long %"PRIu32"\n", \
-                   worker, elapsed); \
-           } \
-       } \
-     while (0)
-#else
-#  define CALL_WORKER(worker, arg) worker(arg)
-#endif
-
-/****************************************************************************
- * Public Functions
- ****************************************************************************/
-
-/****************************************************************************
- * Name: work_process
- *
- * Description:
- *   This is the logic that performs actions placed on any work list.  This
- *   logic is the common underlying logic to all work queues.  This logic is
- *   part of the internal implementation of each work queue; it should not
- *   be called from application level logic.
- *
- * Input Parameters:
- *   wqueue - Describes the work queue to be processed
- *
- * Returned Value:
- *   None
- *
- ****************************************************************************/
-
-void work_process(FAR struct kwork_wqueue_s *wqueue, int wndx)
-{
-  volatile FAR struct work_s *work;
-  worker_t  worker;
-  irqstate_t flags;
-  FAR void *arg;
-  clock_t elapsed;
-  clock_t remaining;
-  clock_t stick;
-  clock_t ctick;
-  clock_t next;
-
-  /* Then process queued work.  We need to keep interrupts disabled while
-   * we process items in the work list.
-   */
-
-  next  = WORK_DELAY_MAX;
-  flags = enter_critical_section();
-
-  /* Get the time that we started processing the queue in clock ticks. */
-
-  stick = clock_systime_ticks();
-
-  /* And check each entry in the work queue.  Since we have disabled
-   * interrupts we know:  (1) we will not be suspended unless we do
-   * so ourselves, and (2) there will be no changes to the work queue
-   */
-
-  work = (FAR struct work_s *)wqueue->q.head;
-  while (work != NULL)
-    {
-      /* Is this work ready?  It is ready if there is no delay or if
-       * the delay has elapsed. qtime is the time that the work was added
-       * to the work queue.  It will always be greater than or equal to
-       * zero.  Therefore a delay of zero will always execute immediately.
-       */
-
-      ctick   = clock_systime_ticks();
-      elapsed = ctick - work->qtime;
-      if (elapsed >= work->delay)
-        {
-          /* Remove the ready-to-execute work from the list */
-
-          dq_rem((struct dq_entry_s *)work, &wqueue->q);
-
-          /* Extract the work description from the entry (in case the work
-           * instance by the re-used after it has been de-queued).
-           */
-
-          worker = work->worker;
-
-          /* Check for a race condition where the work may be nullified
-           * before it is removed from the queue.
-           */
-
-          if (worker != NULL)
-            {
-              /* Extract the work argument (before re-enabling interrupts) */
-
-              arg = work->arg;
-
-              /* Mark the work as no longer being queued */
-
-              work->worker = NULL;
-
-              /* Do the work.  Re-enable interrupts while the work is being
-               * performed... we don't have any idea how long this will take!
-               */
-
-              leave_critical_section(flags);
-              CALL_WORKER(worker, arg);
-
-              /* Now, unfortunately, since we re-enabled interrupts we don't
-               * know the state of the work list and we will have to start
-               * back at the head of the list.
-               */
-
-              flags = enter_critical_section();
-              work  = (FAR struct work_s *)wqueue->q.head;
-            }
-          else
-            {
-              /* Cancelled.. Just move to the next work in the list with
-               * interrupts still disabled.
-               */
-
-              work = (FAR struct work_s *)work->dq.flink;
-            }
-        }
-      else /* elapsed < work->delay */
-        {
-          /* This one is not ready.
-           *
-           * NOTE that elapsed is relative to the current time,
-           * not the time of beginning of this queue processing pass.
-           * So it may need an adjustment.
-           */
-
-          elapsed += (ctick - stick);
-          if (elapsed > work->delay)
-            {
-              /* The delay has expired while we are processing */
-
-              elapsed = work->delay;
-            }
-
-          /* Will it be ready before the next scheduled wakeup interval? */
-
-          remaining = work->delay - elapsed;
-          if (remaining < next)
-            {
-              /* Yes.. Then schedule to wake up when the work is ready */
-
-              next = remaining;
-            }
-
-          /* Then try the next in the list. */
-
-          work = (FAR struct work_s *)work->dq.flink;
-        }
-    }
-
-  /* When multiple worker threads are created for this work queue, only
-   * thread 0 (wndx = 0) will monitor the unexpired works.
-   *
-   * Other worker threads (wndx > 0) just process no-delay or expired
-   * works, then sleep. The unexpired works are left in the queue. They
-   * will be handled by thread 0 when it finishes current work and iterate
-   * over the queue again.
-   */
-
-  if (wndx > 0 || next == WORK_DELAY_MAX)
-    {
-      sigset_t set;
-
-      /* Wait indefinitely until signalled with SIGWORK */
-
-      sigemptyset(&set);
-      nxsig_addset(&set, SIGWORK);
-
-      wqueue->worker[wndx].busy = false;
-      DEBUGVERIFY(nxsig_waitinfo(&set, NULL));
-      wqueue->worker[wndx].busy = true;
-    }
-  else
-    {
-      /* Wait a while to check the work list.  We will wait here until
-       * either the time elapses or until we are awakened by a signal.
-       * Interrupts will be re-enabled while we wait.
-       */
-
-      wqueue->worker[wndx].busy = false;
-      nxsig_usleep(next * USEC_PER_TICK);
-      wqueue->worker[wndx].busy = true;
-    }
-
-  leave_critical_section(flags);
-}
-
-#endif /* CONFIG_SCHED_WORKQUEUE */
diff --git a/sched/wqueue/kwork_queue.c b/sched/wqueue/kwork_queue.c
index 17fedb2..1b868f1 100644
--- a/sched/wqueue/kwork_queue.c
+++ b/sched/wqueue/kwork_queue.c
@@ -43,73 +43,32 @@
  ****************************************************************************/
 
 /****************************************************************************
- * Name: work_qqueue
- *
- * Description:
- *   Queue work to be performed at a later time.  All queued work will be
- *   performed on the worker thread of execution (not the caller's).
- *
- *   The work structure is allocated by caller, but completely managed by
- *   the work queue logic.  The caller should never modify the contents of
- *   the work queue structure; the caller should not call work_qqueue()
- *   again until either (1) the previous work has been performed and removed
- *   from the queue, or (2) work_cancel() has been called to cancel the work
- *   and remove it from the work queue.
- *
- * Input Parameters:
- *   qid    - The work queue ID (index)
- *   work   - The work structure to queue
- *   worker - The worker callback to be invoked.  The callback will be
- *            invoked on the worker thread of execution.
- *   arg    - The argument that will be passed to the worker callback when
- *            int is invoked.
- *   delay  - Delay (in clock ticks) from the time queue until the worker
- *            is invoked. Zero means to perform the work immediately.
- *
- * Returned Value:
- *   None
- *
+ * Name: hp_work_timer_expiry
  ****************************************************************************/
 
-static void work_qqueue(FAR struct kwork_wqueue_s *wqueue,
-                        FAR struct work_s *work, worker_t worker,
-                        FAR void *arg, clock_t delay)
+#ifdef CONFIG_SCHED_HPWORK
+static void hp_work_timer_expiry(wdparm_t arg)
 {
-  irqstate_t flags;
-
-  DEBUGASSERT(work != NULL && worker != NULL);
-
-  /* Interrupts are disabled so that this logic can be called from with
-   * task logic or ifrom nterrupt handling logic.
-   */
-
-  flags = enter_critical_section();
-
-  /* Is there already pending work? */
-
-  if (work->worker != NULL)
-    {
-      /* Remove the entry from the work queue.  It will be requeued at the
-       * end of the work queue.
-       */
-
-      dq_rem((FAR dq_entry_t *)work, &wqueue->q);
-    }
-
-  /* Initialize the work structure. */
-
-  work->worker = worker;           /* Work callback. non-NULL means queued */
-  work->arg    = arg;              /* Callback argument */
-  work->delay  = delay;            /* Delay until work performed */
-
-  /* Now, time-tag that entry and put it in the work queue */
-
-  work->qtime  = clock_systime_ticks(); /* Time work queued */
+  irqstate_t flags = enter_critical_section();
+  sq_addlast((FAR sq_entry_t *)arg, &g_hpwork.q);
+  nxsem_post(&g_hpwork.sem);
+  leave_critical_section(flags);
+}
+#endif
 
-  dq_addlast((FAR dq_entry_t *)work, &wqueue->q);
+/****************************************************************************
+ * Name: lp_work_timer_expiry
+ ****************************************************************************/
 
+#ifdef CONFIG_SCHED_LPWORK
+static void lp_work_timer_expiry(wdparm_t arg)
+{
+  irqstate_t flags = enter_critical_section();
+  sq_addlast((FAR sq_entry_t *)arg, &g_lpwork.q);
+  nxsem_post(&g_lpwork.sem);
   leave_critical_section(flags);
 }
+#endif
 
 /****************************************************************************
  * Public Functions
@@ -148,6 +107,23 @@ static void work_qqueue(FAR struct kwork_wqueue_s *wqueue,
 int work_queue(int qid, FAR struct work_s *work, worker_t worker,
                FAR void *arg, clock_t delay)
 {
+  irqstate_t flags;
+
+  /* Remove the entry from the timer and work queue. */
+
+  work_cancel(qid, work);
+
+  /* Interrupts are disabled so that this logic can be called from with
+   * task logic or from interrupt handling logic.
+   */
+
+  flags = enter_critical_section();
+
+  /* Initialize the work structure. */
+
+  work->worker = worker;           /* Work callback. non-NULL means queued */
+  work->arg = arg;                 /* Callback argument */
+
   /* Queue the new work */
 
 #ifdef CONFIG_SCHED_HPWORK
@@ -155,9 +131,16 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker,
     {
       /* Queue high priority work */
 
-      work_qqueue((FAR struct kwork_wqueue_s *)&g_hpwork, work, worker,
-                  arg, delay);
-      return work_signal(HPWORK);
+      if (!delay)
+        {
+          sq_addlast((FAR sq_entry_t *)work, &g_hpwork.q);
+          nxsem_post(&g_hpwork.sem);
+        }
+      else
+        {
+          wd_start(&work->u.timer, delay, hp_work_timer_expiry,
+                   (wdparm_t)work);
+        }
     }
   else
 #endif
@@ -166,15 +149,22 @@ int work_queue(int qid, FAR struct work_s *work, worker_t worker,
     {
       /* Queue low priority work */
 
-      work_qqueue((FAR struct kwork_wqueue_s *)&g_lpwork, work, worker,
-                  arg, delay);
-      return work_signal(LPWORK);
+      if (!delay)
+        {
+          sq_addlast((FAR sq_entry_t *)work, &g_lpwork.q);
+          nxsem_post(&g_lpwork.sem);
+        }
+      else
+        {
+          wd_start(&work->u.timer, delay, lp_work_timer_expiry,
+                   (wdparm_t)work);
+        }
     }
-  else
 #endif
-    {
-      return -EINVAL;
-    }
+
+  leave_critical_section(flags);
+
+  return OK;
 }
 
 #endif /* CONFIG_SCHED_WORKQUEUE */
diff --git a/sched/wqueue/kwork_signal.c b/sched/wqueue/kwork_signal.c
deleted file mode 100644
index 0f2084f..0000000
--- a/sched/wqueue/kwork_signal.c
+++ /dev/null
@@ -1,111 +0,0 @@
-/****************************************************************************
- * sched/wqueue/kwork_signal.c
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The
- * ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
- * License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
- * License for the specific language governing permissions and limitations
- * under the License.
- *
- ****************************************************************************/
-
-/****************************************************************************
- * Included Files
- ****************************************************************************/
-
-#include <nuttx/config.h>
-
-#include <signal.h>
-#include <errno.h>
-
-#include <nuttx/wqueue.h>
-#include <nuttx/signal.h>
-
-#include "wqueue/wqueue.h"
-
-#ifdef CONFIG_SCHED_WORKQUEUE
-
-/****************************************************************************
- * Public Functions
- ****************************************************************************/
-
-/****************************************************************************
- * Name: work_signal
- *
- * Description:
- *   Signal the worker thread to process the work queue now.  This function
- *   is used internally by the work logic but could also be used by the
- *   user to force an immediate re-assessment of pending work.
- *
- * Input Parameters:
- *   qid    - The work queue ID
- *
- * Returned Value:
- *   Zero (OK) on success, a negated errno value on failure
- *
- ****************************************************************************/
-
-int work_signal(int qid)
-{
-  FAR struct kwork_wqueue_s *work;
-  int threads;
-  int i;
-
-  /* Get the process ID of the worker thread */
-
-#ifdef CONFIG_SCHED_HPWORK
-  if (qid == HPWORK)
-    {
-      work = (FAR struct kwork_wqueue_s *)&g_hpwork;
-      threads = CONFIG_SCHED_HPNTHREADS;
-    }
-  else
-#endif
-#ifdef CONFIG_SCHED_LPWORK
-  if (qid == LPWORK)
-    {
-      work = (FAR struct kwork_wqueue_s *)&g_lpwork;
-      threads = CONFIG_SCHED_LPNTHREADS;
-    }
-  else
-#endif
-    {
-      return -EINVAL;
-    }
-
-  /* Find an IDLE worker thread */
-
-  for (i = 0; i < threads; i++)
-    {
-      /* Is this worker thread busy? */
-
-      if (!work->worker[i].busy)
-        {
-          /* No.. select this thread */
-
-          break;
-        }
-    }
-
-  /* If all of the IDLE threads are busy, then just return successfully */
-
-  if (i >= threads)
-    {
-      return OK;
-    }
-
-  /* Otherwise, signal the first IDLE thread found */
-
-  return nxsig_kill(work->worker[i].pid, SIGWORK);
-}
-
-#endif /* CONFIG_SCHED_WORKQUEUE */
diff --git a/sched/wqueue/kwork_thread.c b/sched/wqueue/kwork_thread.c
index 3c3b7eb..2e48399 100644
--- a/sched/wqueue/kwork_thread.c
+++ b/sched/wqueue/kwork_thread.c
@@ -36,12 +36,37 @@
 
 #include <nuttx/wqueue.h>
 #include <nuttx/kthread.h>
+#include <nuttx/semaphore.h>
 
 #include "wqueue/wqueue.h"
 
 #if defined(CONFIG_SCHED_WORKQUEUE)
 
 /****************************************************************************
+ * Pre-processor Definitions
+ ****************************************************************************/
+
+#if defined(CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE) && CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE > 0
+#  define CALL_WORKER(worker, arg) \
+     do \
+       { \
+         uint32_t start; \
+         uint32_t elapsed; \
+         start = up_critmon_gettime(); \
+         worker(arg); \
+         elapsed = up_critmon_gettime() - start; \
+         if (elapsed > CONFIG_SCHED_CRITMONITOR_MAXTIME_WQUEUE) \
+           { \
+             serr("WORKER %p execute too long %"PRIu32"\n", \
+                   worker, elapsed); \
+           } \
+       } \
+     while (0)
+#else
+#  define CALL_WORKER(worker, arg) worker(arg)
+#endif
+
+/****************************************************************************
  * Public Data
  ****************************************************************************/
 
@@ -65,45 +90,84 @@ struct lp_wqueue_s g_lpwork;
  * Name: work_thread
  *
  * Description:
- *   These are the worker threads that performs the actions placed on the
+ *   These are the worker threads that perform the actions placed on the
  *   high priority work queue.
  *
  *   These, along with the lower priority worker thread(s) are the kernel
- *   mode work queues (also build in the flat build).
+ *   mode work queues (also built in the flat build).
  *
  *   All kernel mode worker threads are started by the OS during normal
  *   bring up.  This entry point is referenced by OS internally and should
  *   not be accessed by application logic.
  *
  * Input Parameters:
- *   argc, argv (not used)
+ *   argc, argv
  *
  * Returned Value:
  *   Does not return
  *
  ****************************************************************************/
 
-static int work_thread(int argc, char *argv[])
+static int work_thread(int argc, FAR char *argv[])
 {
-  FAR struct kwork_wqueue_s *queue;
-  int wndx;
+  FAR struct kwork_wqueue_s *wqueue;
+  FAR struct work_s *work;
+  worker_t  worker;
+  irqstate_t flags;
+  FAR void *arg;
 
-  queue  = (FAR struct kwork_wqueue_s *)
+  wqueue = (FAR struct kwork_wqueue_s *)
            ((uintptr_t)strtoul(argv[1], NULL, 0));
-  wndx   = atoi(argv[2]);
+
+  flags = enter_critical_section();
 
   /* Loop forever */
 
   for (; ; )
     {
       /* Then process queued work.  work_process will not return until: (1)
-       * there is no further work in the work queue, and (2) signal is
-       * triggered, or delayed work expires.
+       * there is no further work in the work queue, and (2) semaphore is
+       * posted.
+       */
+
+      nxsem_wait_uninterruptible(&wqueue->sem);
+
+      /* And check each entry in the work queue.  Since we have disabled
+       * interrupts we know:  (1) we will not be suspended unless we do
+       * so ourselves, and (2) there will be no changes to the work queue
        */
 
-      work_process(queue, wndx);
+      /* Remove the ready-to-execute work from the list */
+
+      work = (FAR struct work_s *)sq_remfirst(&wqueue->q);
+      if (work && work->worker)
+        {
+          /* Extract the work description from the entry (in case the work
+           * instance will be re-used after it has been de-queued).
+           */
+
+          worker = work->worker;
+
+          /* Extract the work argument (before re-enabling interrupts) */
+
+          arg = work->arg;
+
+          /* Mark the work as no longer being queued */
+
+          work->worker = NULL;
+
+          /* Do the work.  Re-enable interrupts while the work is being
+           * performed... we don't have any idea how long this will take!
+           */
+
+          leave_critical_section(flags);
+          CALL_WORKER(worker, arg);
+          flags = enter_critical_section();
+        }
     }
 
+  leave_critical_section(flags);
+
   return OK; /* To keep some compilers happy */
 }
 
@@ -130,14 +194,17 @@ static int work_thread_create(FAR const char *name, int priority,
                               int stack_size, int nthread,
                               FAR struct kwork_wqueue_s *wqueue)
 {
-  FAR char *argv[3];
-  char args[2][16];
+  FAR char *argv[2];
+  char args[16];
   int wndx;
   int pid;
 
-  snprintf(args[0], 16, "0x%" PRIxPTR, (uintptr_t)wqueue);
-  argv[0] = args[0];
-  argv[2] = NULL;
+  snprintf(args, 16, "0x%" PRIxPTR, (uintptr_t)wqueue);
+  argv[0] = args;
+  argv[1] = NULL;
+
+  nxsem_init(&wqueue->sem, 0, 0);
+  nxsem_set_protocol(&wqueue->sem, SEM_PRIO_NONE);
 
   /* Don't permit any of the threads to run until we have fully initialized
    * g_hpwork and g_lpwork.
@@ -147,9 +214,6 @@ static int work_thread_create(FAR const char *name, int priority,
 
   for (wndx = 0; wndx < nthread; wndx++)
     {
-      snprintf(args[1], 16, "%d", wndx);
-      argv[1] = args[1];
-
       pid = kthread_create(name, priority, stack_size,
                            (main_t)work_thread, argv);
 
@@ -164,7 +228,6 @@ static int work_thread_create(FAR const char *name, int priority,
 #ifdef CONFIG_PRIORITY_INHERITANCE
       wqueue->worker[wndx].pid  = pid;
 #endif
-      wqueue->worker[wndx].busy = true;
     }
 
   sched_unlock();
diff --git a/sched/wqueue/wqueue.h b/sched/wqueue/wqueue.h
index 106cb2d..774cebc 100644
--- a/sched/wqueue/wqueue.h
+++ b/sched/wqueue/wqueue.h
@@ -51,15 +51,17 @@
 
 struct kworker_s
 {
-  pid_t             pid;    /* The task ID of the worker thread */
-  volatile bool     busy;   /* True: Worker is not available */
+#ifdef CONFIG_PRIORITY_INHERITANCE
+  pid_t             pid;       /* The task ID of the worker thread */
+#endif
 };
 
 /* This structure defines the state of one kernel-mode work queue */
 
 struct kwork_wqueue_s
 {
-  struct dq_queue_s q;         /* The queue of pending work */
+  struct sq_queue_s q;         /* The queue of pending work */
+  sem_t             sem;       /* The counting semaphore of the wqueue */
   struct kworker_s  worker[1]; /* Describes a worker thread */
 };
 
@@ -70,7 +72,8 @@ struct kwork_wqueue_s
 #ifdef CONFIG_SCHED_HPWORK
 struct hp_wqueue_s
 {
-  struct dq_queue_s q;         /* The queue of pending work */
+  struct sq_queue_s q;         /* The queue of pending work */
+  sem_t             sem;       /* The counting semaphore of the wqueue */
 
   /* Describes each thread in the high priority queue's thread pool */
 
@@ -85,7 +88,8 @@ struct hp_wqueue_s
 #ifdef CONFIG_SCHED_LPWORK
 struct lp_wqueue_s
 {
-  struct dq_queue_s q;      /* The queue of pending work */
+  struct sq_queue_s q;         /* The queue of pending work */
+  sem_t             sem;       /* The counting semaphore of the wqueue */
 
   /* Describes each thread in the low priority queue's thread pool */
 
@@ -152,26 +156,6 @@ int work_start_lowpri(void);
 #endif
 
 /****************************************************************************
- * Name: work_process
- *
- * Description:
- *   This is the logic that performs actions placed on any work list.  This
- *   logic is the common underlying logic to all work queues.  This logic is
- *   part of the internal implementation of each work queue; it should not
- *   be called from application level logic.
- *
- * Input Parameters:
- *   wqueue - Describes the work queue to be processed
- *   wndx   - The worker thread index
- *
- * Returned Value:
- *   None
- *
- ****************************************************************************/
-
-void work_process(FAR struct kwork_wqueue_s *wqueue, int wndx);
-
-/****************************************************************************
  * Name: work_initialize_notifier
  *
  * Description: